Operations

Basic I/O

To see how the various components interact we first examine a basic get and put operations for the basic case of a non-composite Dataset. We assume that the Butler is configured with an external Registry and Datastore, both consisting of a client-server pair.

Basic get

The user has a DatasetLabel, constructed or obtained by a query and wishes to retrieve the associated InMemoryDataset.

This proceeds allong the following steps:

  1. User calls: butler.get(label).
  2. Butler forwards this call to its Registry, adding the Collection (from the Run) it was configured with (i.e. butler.registry.find(self.run.collection, label)).
  3. Registry performs the lookup on the server using SQL and returns the URI for the stored Dataset (via a DatasetHandle)
  4. Butler forwards the request, with both the URI and the StorageClass, to the Datastore client (i.e. butler.datastore.get(handle.uri, handle.type.storageClass)).
  5. Datastore client requests a serialized version of the Dataset from the server using the URI.
  6. Using the StorageClass to determine the appropriate deserialization function, the Datastore client then materializes the InMemoryDataset and returns it to the Butler.
  7. Butler then returns the InMemoryDataset to the user.

See the API documentation for more information.

Note

  • The Datastore request can be a simple HTTP GET request for a stored FITS file, or something more complicated. In the former case the materialization would be a simple FITS read (e.g. of a calexp), with the reader determined by the StorageClass retrieved from the Registry.
  • The serialized version sent over the wire doesn’t have to correspond to the format stored on disk in the Datastore server. It just needs to be serialized in the form expected by the client.

Basic put

The user has a InMemoryDataset and wishes to store this at a particular DatasetLabel.

This proceeds allong the following steps:

  1. User calls: butler.put(label, inMemoryDataset).
  2. Butler expands the DatasetLabel into a full DatasetRef using the Registry, by calling datasetRef = butler.registry.expand(label).
  3. Butler obtains a StorageHint by calling storageHint = datasetRef.makeStorageHint(self.run).
  4. Butler then asks the Datastore client to store the file by calling: butler.datastore.put(inMemoryDataset, datasetRef.type.storageClass, storageHint, datasetRef.type.name).
  5. The Datastore client then uses the serialization function associated with the StorageClass to serialize the InMemoryDataset and sends it to the Datastore server. Depending on the type of server it may get back the actual URI or the client can generate it itself.
  6. Datastore returns the actual URI to the Butler.
  7. Butler calls the Registry function addDataset to add the Dataset.
  8. Butler returns a DatasetHandle to the user.

See the API documentation for more information.

Composites

Warning

The way composites work is likely soon to be redesigned.

A Dataset can be composite, in which case it consists of a parent Dataset and one or more child Datasets. An example would be an Exposure which includes a Wcs, a Mask, and an Image (as well as other components). There are several ways this may be stored by the Datastore:

  • As part of the parent Dataset (e.g. the full Exposure is written to a single FITS file).
  • As a set of entities without a parent (e.g. only the Wcs, Mask and Image are written separately and the Exposure needs to be composed from them).
  • As a mix of the two extremes (e.g. the Mask and Image are part of the Exposure file but the Wcs is written to a separate file).

In either case the user expects to be able to read an individual component, and in case the components are stored separately the transfer should be efficient.

In addition, it is desirable to be able to override parts of a composite Dataset (e.g. updated metadata), by defining a new DatasetType that mixes components from the original Dataset with new ones.

To support this the Registry is also responsible for storing the component Datasets of the composite.

The DatasetHandle returned by Registry.find() therefore not only includes the URI and StorageClass of the parent (associated with the DatasetRef), but also a components dictionary of name : DatasetHandle specifying its children.

The Butler retrieves all Datasets from the Datastore as InMemoryDatasets and then calls the assemble function associated with the StorageClass of the primary to create the final composed InMemoryDataset.

This process is most easily understood by reading the API documentation for butler.get and butler.put.

Transferring Registries and Datastores

A user has a Butler instance that holds a Registry client instance and a Datastore client instance, both connected to their remote server equivalents. Now the user wants to obtain a local subset of the upstream Datasets (and all related DataUnits, DatasetTypes and possibly Quanta and Collections) held by the Registry.

There are three cases:

We will ignore the last one for now, because it is effectively a kind of caching, and focus on the first two instead.

While no high-level API for transfers exists in the current design, it is relatively easy to implement on top of the provided low-level API.

transfer(dst, src, expr, collection, copyDatasets=False)

Transfer Datasets and related entities between Butlers.

Parameters:
  • dst (Butler) – Butler instance of destination.
  • src (Butler) – Butler instance of source.
  • expr (str) – an expression (SQL query that evaluates to a list of dataset_id) that selects the Datasets.
  • collection (str) – a Collection used to identify the requested transfered Datasets in the Registry of the destination Butler.
  • copyDatasets (bool) – Should the Datasets be copied from the source to the destination Datastore?

A possible implementation could be:

dst.registry.transfer(src.registry, expr, collection)

if copyDatasets:
    for label in dst.query(
        # get DatasetLabels for all Datasets in collection
        ):

        ref = dst.registry.expand(label)
        template = dst.config.templates.get(ref.type.name, None)
        storageHint = ref.makeStorageHint(dst.config.outputCollection, template)
        handle = src.registry.find(collection, label)

        uri, components = dst.datastore.transfer(src.datastore, handle.uri, ref.type.storageClass, storageHint, ref.type.name)
        dst.registry.addDataset(ref, uri, components, handle.producer, handle.run)
else:
    # The following assumes the old datastore was empty and that the datastore will be
    # read-only.  Otherwise we will have to some chaining.
    dst.datastore = src.datastore

Todo

This is just a draft implementation to show the interfaces enable transfer to be written. However there are many remaining details to be worked out. Such as:

  • What should happen if the Dataset composition is different in the output datastore?
  • How exactly to implement Datastore chaining?
  • How to make this transactionally safe?
  • At what place in the component hierarchy should the high-level transfer be implemented? Since it is effectively a double-dispatch problem.

Once these details have been worked out the high-level transfer should become part of the API.

Note

Depending on the ability to join user tables to data release tables in the science platform, transfers between butlers may or may not be common.

Remote Access and Caching

The user has a Butler instance. This Butler instance holds a local Registry client instance that is connected to a remote read-only Registry server (database). It also holds a local Datastore client that also is connected to a remote Datastore.

The user now calls butler.get() to obtain an InMemoryDataset from the Datastore, proceeds with some further processing, and subsequently wants to load the same InMemoryDataset again.

This is most easily supported by a pass-through caching Datastore. The Butler now holds an instance of the caching Datastore instead. The caching Datastore in turn holds the client to the remote Datastore.

digraph ButlerWithDatastoreCache {
node[shape=record]
edge[dir=back, arrowtail=empty]

Butler -> ButlerConfiguration [arrowtail=odiamond];
Butler -> DatastoreCache [arrowtail=odiamond];
DatastoreCache -> Datastore [arrowtail=odiamond];
Butler -> Registry [arrowtail=odiamond];
}

A trivial implementation, for a non-persistent cache, could be:

class DatastoreCache
cache

A dictionary of {(URI, parameters) : InMemoryDataset}.

datastore

The chained Datastore.

__init__(datastore)

Initialize with chained Datastore.

get(uri, parameters=None)

Implemented as:

def get(uri, parameters=None):
    if (uri, parameters) not in self.cache:
        self.cache[(uri, parameters)] = self.datastore.get(uri, parameters)

    return self.cache[(uri, parameters)]
put(inMemoryDataset, storageClass, storageHint, typeName=None) → URI, {name: URI}

Direct forward to self.datastore.put.

transfer(inputDatastore, inputUri, storageClass, storageHint, typeName=None) → URI, {name: URI}

Direct forward to self.datastore.transfer.

Todo

  • What to do when parameters differ? Should we re-slice?
  • Work out how persistable caches should be implemented.

Note

Caching is fundamentally different from Transferring Registries and Datastores in that it does not modify the Registry at all. This makes it a much more lightweight operation when the input Registry is read-only (and only read-only access is needed), but it means the Registry cannot be used to obtain the local storageHint to the cached files for use by external tools.

SuperTask Pre-Flight and Execution

Warning

The exact operation of SuperTask pre-flight is currently being redesigned. But this section may still be helpful in understanding the issues involved.

Note

This description currently has the SuperTask control code operating directly on Registry and Datastore objects instead of Butlers. Actual SuperTasks, of course, still only see a Butler. But we should decide when the design is more mature whether to hide the interfaces the control code uses behind Butler as well.

Preflight

The inputs to SuperTask preflight are considered here to be:

  • an input Registry instance (may be read-only)
  • an input Datastore instance (may be read-only)
  • an output Registry instance (may be the same as the input Registry, but must not be read-only)
  • an output Datastore instance (may be the same as the input Datastore, but must not be read-only)
  • a Pipeline (contains SuperTasks, configuration, and the set of DatasetTypes needed as inputs and expected as outputs)
  • a user expression that limits the DataUnits to process.
  • an ordered list of Collections from which to obtain inputs
  • a Collection that labels the processing run.

Todo

In order to construct the SuperTasks in a Pipeline (and extract the DatasetTypes), we need to pass the SuperTask constructors a Butler or some other way to load the schemas of any catalogs they will use as input datasets. These may differ between collections!

  1. Preflight begins with the activator calling Registry.makeDataGraph with the given expression, list of input collections, and the sets of DatasetTypes implicit in the Pipeline. The returned QuantumGraph contains both the full set of input Datasets that may be required and the full set of DataUnits that will be used to describe any future Datasets.

  2. If the output Registry is not the same as the input Registry, the activator transfers (see Transferring Registries and Datastores) all Registry content associated with the Datasets in the graph to the output Registry. The input Datasets themselves may be transferred to the output Datastore at the same time if this will make subsequent processing more efficient.

  3. The activator calls Registry.makeRun() on the output Registry with the output Collection, obtaining a Run instance.

  4. The activator adds all input Datasets to the Run’s Collection (in the Registry; this does not affect the Datastore at all). Note that from this point forward, we need only work with a single Collection, as we have aggregated everything relevant from the multiple input Collections into a single input/output Collection.

  5. The activator constructs a Butler from the output Registry (which can now also be used as input), the Run’s Collection, and either the given Datastore (if the same one is used for input and output) or a pass-through Datastore that forwards input and output requests to the two given ones appropriately.

  6. The activator records the Pipeline configuration and a description of the software environment (as regular Datasets) using the Butler and associates them with the Run by calling Registry.updateRun().

  7. The activator calls defineQuanta on each of the SuperTasks in the Pipeline, passing them the Run and the QuantumGraph. Each SuperTask manipulates the QuantumGraph to add its Quanta and output DatasetRef to it.

    Note

    This differs slightly from the SuperTask design in DMTN-055, in which SuperTasks return unstructured lists of Quanta and the activator assembles them into a graph.

After these steps, the QuantumGraph contains a complete description of the processing to be run, with each Quantum it holds having complete predictedInputs and outputs lists. The QuantumGraph can then be serialized or otherwise transferred to a workflow system to schedule execution.

At the end of preflight, the only modifications that have been made to the output Registry are the addition of a Run, the association of all input Datasets with the Run’s Collection, and the addition of Datasets recording the configuration and software environment. Those two Datasets are the only modifications to the output Datastore.

Todo

May want to try a few examples of defineQuanta implementations, perhaps covering applying master calibrations and making coadds.

Building Preflight Queries

The call to Registry.makeDataGraph() at the start of Preflight hides a great deal of complexity that is central to how the Registry schema supports SuperTask Preflight. The implementation of makeDataGraph is responsible for generating a complex SQL query, interpreting the results, and packaging them into a data structure (a QuantumGraph with a DataUnitMap) that can be queried and extended by SuperTask.defineQuanta.

The query generated by Registry.makeDataGraph() is built by combining a machine-generated output field clause, a machine generated FROM clause, a machine-generated partial WHERE clause, and a supplemental partial WHERE clause provided by the user (the “expression” discribed above).

As an example, we’ll consider the case where we are building coadds, which means we’re combining warp Datasets to build coadd Datasets. The DataUnit types associated with warp are:

while those associated with coadd are:

It’s worth noting that of these, only Visit and Patch are needed to fully identify a warp and only Patch and AbstractFilter are needed to identify a coadd; all of the other DataUnit types are uniquely identifed as foreign key targets of these.

Because the Pipeline we’re running starts with warps produced in another processing run, warp will be the only element in the neededDatasetTypes argument and coadd will be the only element in the futureDatasetTypes argument.

The process starts by extracting the DataUnit types from both the neededDatasetTypes and futureDatasetTypes arguments to makeDataGraph, and removing duplicates. Python code to do that looks something like this:

unitTypes = []
for datasetType in neededDatasetTypes:
    unitTypes.extend(datasetType.units)
for datasetType in futureDatasetTypes:
    unitTypes.extend(datasetType.units)
unitTypes = DataUnitTypeSet(unitTypes)  # removes duplicates

In our coaddition example, unitTypes == (Visit, PhysicalFilter, Camera, Patch, Tract, SkyMap, AbstractFilter).

We add the tables for all of these DataUnit types to the FROM clause, with inner joins between all of them, and add their “value” fields to the field list. Our example query now looks like this:

SELECT
    Visit.visit_number,
    PhysicalFilter.physical_filter_name,
    Camera.camera_name,
    Patch.patch_index,
    Tract.tract_number,
    SkyMap.skymap_name,
    AbstractFilter.abstract_filter_name
FROM
    Visit
    INNER JOIN PhysicalFilter
    INNER JOIN Camera
    INNER JOIN Patch
    INNER JOIN Tract
    INNER JOIN SkyMap
    INNER JOIN AbstractFilter

We’ll add the join restrictions later as part of the WHERE clause instead of via ON clauses. Using ON is certainly possible and may be advisable in an actual implementation, but it makes the logic a bit harder to follow.

Some of the join restrictions are simple; they’re just the foreign keys in the tables we’ve included. The remaining join restrictions between the DataUnits involve bringing the many-to-many join tables between DataUnits. We simply include any join table that corresponds to any pair of DataUnit types in the full list. That appends the following to our SQL statement:

% ...everything in the past SQL code snippet...
    INNER JOIN VisitPatchJoin
WHERE
    Visit.physical_filter_name = PhysicalFilter.physical_filter_name
        AND
    Visit.camera_name = Camera.camera_name
        AND
    PhysicalFilter.camera_name = Camera.camera_name
        AND
    Patch.tract_number = Tract.tract_number
        AND
    Patch.skymap_name = SkyMap.skymap_name
        AND
    Tract.skymap_name = SkyMap.skymap_name
        AND
    PhysicalFilter.abstract_filter_name = AbstractFilter.abstract_filter_name
        AND
    VisitPatchJoin.visit_number = Visit.visit_number
        AND
    VisitPatchJoin.camera_name = Visit.camera_name
        AND
    VisitPatchJoin.patch_index = Patch.patch_index
        AND
    VisitPatchJoin.tract_number = Patch.tract_number
        AND
    VisitPatchJoin.skymap_name = Patch.skymap_name

Todo

That last statement in the text is a small lie; we don’t want to bring in the VisitTractJoin table even though both Visit and Tract are in our list because it’s redundant with VisitPatchJoin. That’s not hard to fix; we just need to invent a rule that says to never include some join table if you already have another one, and define that hierarchy in the concrete DataUnit reference sections.

This query already produces the table of DataUnit primary key values we’d need to construct a DataUnitMap, which is one of the most important components of the QuantumGraph we’ll pass to SuperTask.defineQuanta. But it currently covers the full “universe” of possible coadds: any known Visit that overlaps any known Patch is included. We want to filter this in two ways:

  • we need to apply the user’s filter expression;
  • we need to only consider warps that already exist in the Collection(s) we’re using as inputs.

We’ll start with the first one, because it’s easy: we just append the user expression to the end of the WHERE clause with an extra AND, wrapping it in parenthesis. That provides a very straightforward definition of what the user expression is: any valid SQL boolean expression that utilizes any of the DataUnit tables implied by the Pipeline. Some examples:

  • Make coadds for any patches and filters that involve a range of HSC visits:

    (Visit.visit_number BETWEEN 500 AND 700)
        AND
    Camera.camera_name = 'HSC'
        AND
    SkyMap.skymap_name = 'SSP-WIDE
    
  • Make a r-band coadd for a specific patch and filter, using any available data from HSC and CFHT:

    Tract.tract_number = 23
        AND
    Patch.patch_index = 56
        AND
    SkyMap.skymap_name = 'SSP-WIDE`
        AND
    AbstractFilterName.abstract_filter_name = 'r'
        AND
    (Camera.camera_name = 'HSC' OR Camera.camera_name = 'CFHT')
    
  • Make all coadds with data taken after a certain date:

    Visit.obs_begin > '2017-10-14'
        AND
    Camera.camera_name = 'HSC'
        AND
    SkyMap.skymap_name = 'SSP-WIDE
    

A few things stand out:

  • It’s almost always necessary to provide both the camera name and the skymap name. We could imagine having the higher-level activator code provide defaults for these so the user doesn’t always have to include them explicitly.
  • The expressions can get quite verbose, as there’s a lot of redundancy between the table names and the field names. We might be able to eliminate a lot of that via a regular expression or other string substitution that transforms any comparison on a DataUnit type (e.g. Visit = 500) name to a comparison on its “value” field (e.g. Visit.visit_number = 500).
  • We can’t (currently) filter on DataUnits that aren’t utilized by the DatasetTypes produced or consumed by the Pipeline. That makes it impossible to e.g. filter on Tract if you’re just running a single-visit processing Pipeline. This is not a fundamental limitation, though; we just need to find some way for the user to declare in advance what additional DataUnits their expression will use. It’d be best if we could infer that by actually parsing their expression, but if that’s hard we could just make them declare the extra DataUnits explicitly to the activator.

To restrict the query to DataUnits associated with already-existing input data (warps, in this case), we iterate over the DatasetTypes in the neededDatasetTypes list and, for each DatasetType, add:

In the coaddition example, that makes our full query (now completed):

SELECT
    Visit.visit_number,
    PhysicalFilter.physical_filter_name,
    Camera.camera_name,
    Patch.patch_index,
    Tract.tract_number,
    SkyMap.skymap_name,
    AbstractFilter.abstract_filter_name,
    warp.dataset_id AS warp_dataset_id,
    warp.registry_id AS warp_registry_id
FROM
    Visit
    INNER JOIN PhysicalFilter
    INNER JOIN Camera
    INNER JOIN Patch
    INNER JOIN Tract
    INNER JOIN SkyMap
    INNER JOIN AbstractFilter
    INNER JOIN VisitPatchJoin
    INNER JOIN Dataset AS warp
    INNER JOIN DatasetVisitJoin AS warpVisitJoin
    INNER JOIN DatasetPatchJoin AS warpPatchJoin
    INNER JOIN DatasetCollections AS warpCollections
WHERE
    Visit.physical_filter_name = PhysicalFilter.physical_filter_name
        AND
    Visit.camera_name = Camera.camera_name
        AND
    PhysicalFilter.camera_name = Camera.camera_name
        AND
    Patch.tract_number = Tract.tract_number
        AND
    Patch.skymap_name = SkyMap.skymap_name
        AND
    Tract.skymap_name = SkyMap.skymap_name
        AND
    PhysicalFilter.abstract_filter_name = AbstractFilter.abstract_filter_name
        AND
    VisitPatchJoin.visit_number = Visit.visit_number
        AND
    VisitPatchJoin.camera_name = Visit.camera_name
        AND
    VisitPatchJoin.patch_index = Patch.patch_index
        AND
    VisitPatchJoin.tract_number = Patch.tract_number
        AND
    VisitPatchJoin.skymap_name = Patch.skymap_name
        AND
    warp.dataset_type_name = 'warp'
        AND
    warp.dataset_id = warpVisitJoin.dataset_id
        AND
    warp.registry_id = warpVisitJoin.registry_id
        AND
    warpVisitJoin.visit_number = Visit.visit_number
        AND
    warpVisitJoin.camera_name = Visit.camera_name
        AND
    warp.dataset_id = warpPatchJoin.dataset_id
        AND
    warp.registry_id = warpPatchJoin.registry_id
        AND
    warpPatchJoin.patch_index = Patch.patch_index
        AND
    warpPatchJoin.tract_number = Patch.tract_number
        AND
    warpPatchJoin.skymap_name = Patch.skymap_name
        AND
    warpCollections.dataset_id = warp.dataset_id
        AND
    warpCollections.registry_id = warp.registry_id
        AND
    warpCollections.collection = ($USER_TAG)
    ($USER_EXPRESSION)
;

Note

The example above demonstrates using only a single Collection. Handling multiple Collections is quite a bit trickier. It can obviously be accomplished with temporary tables, views, or subqueries that create a de-duplicated list of Datasets for each DatasetType across all given Collections before joining them into the main query. It is not clear whether it can be accomplished directly within a single query with no subqueries.

Adding the Dataset fields to the SELECT field list is clearly unnecessary for constraining the query; that all happens in the WHERE clause. What these do is identify the set of input Datasets that will be used by the processing. In this example, each row has a unique (compound) warp ID, but that’s not always true - to be safe in general, duplicates will have to be removed.

As written, this query doesn’t pull down everything about the Datasets. Including all of the fields that describe a Dataset in the same query is clearly possible (albeit a bit tricky in the case of composites), but it’s not obviously more efficient than running smaller follow-up queries to get the extra Dataset fields when the original query may have a lot of duplicates.

We actually face the same problem for the extra fields associated with the DataUnits; our query so far generates all of the primary key values and relationship information we’ll need, but we’ll need to follow that up with later queries to fill in the extra fields or add a lot more fields. And as with the Datasets, we could instead add the extra fields to the main query, but doing so will in general involve a lot of duplicate values.

We will assume for now that we’ll leave the main query as-is and use follow-up queries to expand its results into a list of DatasetHandles that we can add to the QuantumGraph. As noted above, the DataUnit primary keys from the main query are sufficient to construct a DataUnitMap to attach to it, and the implementation of Registry.makeDataGraph() is complete.

Fine-Grained Input Control

The collections and expression passed to Registry.makeDataGraph() provide a level of control over processing inputs that should be sufficient for most SuperTask execution invoked by developers and science users. That level of control may not be sufficient for production operators, however – though in most cases, it’s actually that exercising the levels of control operators require may be unpleasant or inconvenient.

That’s because the Collection collection system is already extremely flexible. As long as an operator is permitted to apply collections to Datasets in the database that backs a Registry (which may not involve going through the Registry interface, they can create a Collection including (and more importantly, not including) any Datasets they’d like, whether that’s generated by one or more SQL queries, external programs, or human inspections. This mechanism should be strongly considered as at least part of any implentation of a fine-grained control use case before we add additional logic to Registry.makeDataGraph(). We will, after all, be adding all input data to the Collection associated with each Run during the course of preflight anyway, and it is perfectly acceptable to do this prior to preflight and then use that existing Collection to label the Run (making the later assignment of the input data to that Collection a no-op).

Two types of fine-grained control stand out as being difficult (perhaps impossible) to handle with just Collections:

  • blacklists that apply to only some processing steps, not all of them;
  • manual alterations of the relationships between raw science images and master calibration Datasets.

The current system could easily be extended to support these use cases in other ways, however:

  • Blacklisting that only applies to a single SuperTask could be implemented as a blacklist Dataset (possibly a database-backed one) that is passed to the SuperTask’s defineQuanta method and applied there. This would require adding some mechanism for passing Datasets to defineQuanta without permitting SuperTasks to load arbitrary Datasets at that stage.
  • Manual alterations of calibration product relationships could be implemented by creating a new set of VisitRange DataUnits and assigning existing them to new Datasets in a new Collection whose URIs are taken from existing Datasets. We’d need to think through the implications of having multiple Datasets with the same URIs, and we’d certainly need some new high-level code to make this easy to do.

This does not rule out adding new logic and arguments to Registry.makeDataGraph() to meet fine-grained input control requirements, of course, and it is also possible that we could let operators write the entire query generated by makeDataGraph manually. The complexity of the those queries makes writing them manually from scratch a significant ask, of course, so it might be best to instead let operators modify a generated query after it has been generated. That would generally involve editing only the FROM and WHERE clauses, as downstream code that interprets the query results would require the field list to remain unchanged.

Because a single query is used to define the inputs for all processing steps, however, even manual control over the query would not permit operators to control which inputs are used in different steps independently. Complete operator control over that would probably have to involve generating Quanta to pass to SuperTask.runQuantum manually, without calling defineQuanta or other standard preflight code at all. While probably possible (and perhaps not even too difficult) for a fixed Pipeline, this would make it harder to propagate changes to the Pipeline into the production system. It also raises a fundamental philosophical question about the degree of determinism (vs. runtime flexibility) we expect from a particular release of Science Pipelines code, because it makes it impossible to guarantee that input-selection logic will be the same in production as it was in development.

Direct Execution

This section describes executing SuperTasks in an environment in which the same output Registry and Datastore used for preflight are directly accessible to the worker processes. See Shared-Nothing Execution for SuperTask execution in an environment where workers cannot access the Datastore or the output Registry.

  1. The activator constructs an input/output Butler with the same Registry and Datastore used in preflight.

  2. The activator loops over all Quanta it has been assigned by the workflow system. For each one, it:

    1. adds the Quantum to the Registry by calling Registry.addQuantum(). This stores the predictedInputs provenance in the Registry;
    2. transforms all predictedInputs DatasetRefs into DatasetHandles, allowing the control code to test whether all needed inputs are present before actually invoking SuperTask code;
    3. calls SuperTask.runQuantum with the Quantum instance and the Butler instance. The SuperTask calls Butler.get() (using the DatasetRefs in Quantum.predictedInputs) to obtain its inputs, and indicates the ones it actually utilizes by calling Butler.markInputUsed(). Outputs are saved with Butler.put(), which is passed the Quantum instance to automatically record outputs provenance.

If the SuperTask throws an exception or otherwise experiences a fatal error, the Quantum that defined its execution will thus have already been added to the Registry whith as much information as possible about its inputs and outputs, maximizing its use in debugging the failure.

Shared-Nothing Execution

The LSST Batch Production Service plans to use worker nodes that cannot connect to the central Registry database and Datastore that provide long-term management and storage of their inputs and outputs. Instead, after Preflight, copies of all predictedInput Datasets for the Quanta to be executed on a node will be transferred to local scratch space in advance. After execution, outputs will be transferred back and ingested into the permanent storage system.

These transfers may be executed by external code that accesses the internals of the permanent Datastore and Registry, and one likely requirement of these tools is that Registry-wide unique filenames for both inputs and outputs be known in advance. Because the design described in this document does not deal directly with filenames (which are implementation details of some Datastores, we will assume here that the StorageHint for a Dataset (which is by construction unique across a Registry) is the filename used by the Datastore. This assumption also implies that in this case we can construct a URI directly from a StorageHint (whereas this would usually be a private transformation defined by a Datastore). Because the templates used to generate StorageHints are defined by the Registry, there is no practical loss in generality from this assumption.

The QuantumGraph generated by Preflight holds DatasetRef instances for all input and output Datasets that may be produced. An invocation of Preflight is also associated with exactly one Run, and this provides enough information to obtain the unique storageHints of all Datasets: we simply call makeStorageHint(run) on each DatasetRef in the graph. External code can then transfer the filenames from their persistent storage to local scratch, maintaining the unique filenames within a new directory.

To execute SuperTask code on the scratch space, we need to construct a Butler, and that means constructing a Registry and Datastore. The Datastore is simple: it reads and writes to a local POSIX filesystem that uses the same StorageHint/filename-based URIs as the permanent Datastore.

For the Registry, we will use a limited Registry, which uses a simple set of dictionary-like mappings instead of a full SQL database. The content needed to construct it can be saved to a file by calling export() on the master Registry with the subset of the QuantumGraph to be processed on the node as the only argument. The exported Registry contents can then be transferred to the node like any other file.

SuperTask execution then proceeds on the worker node as described in Direct Execution, but with a Butler initialized with the local limited Registry and simple Datastore. The local Registry is still responsible for recording provenance information provided by the SuperTask(s) and activator.

After execution has completed on the node, we use Registry.export() on the local Registry, this time to dump its entire contents to a file. This includes all provenance information and the list of Datasets, though the “intrusive” actualInputs provenance is the only information that could not instead be inferred from the transfer system’s own records and the set of files in the directory.

Because the current plan is for APIs outside the scope of this document to be used when ingesting provenance information and output Datasets into persistent storage, the only additional requirement on the interfaces described here is that the limited Registry export file format either be readable by the external code or easily convertible to a format that is.

Todo

Once our interfaces for transfers between Datastores are more mature, we should be able to implement this operation using just the APIs described in this document, without assuming anything about Datastores that use unique filenames (though doing so may be a significant optimization). Even if that’s not something the LDF production system ever uses, it might provide an easier way to set up mini-production systems with shared-nothing nodes at other locations.

Running Comparison SuperTasks

SuperTasks that compare their input Datasets, and hence wish to access Datasets with the same DataUnits in different Collections, were not anticipated by the original SuperTask design and are not included in the description of SuperTask covered above.

These can be supported by the current data access design, with the following qualifications:

  • As executing a Pipeline always outputs to a single Run and a single Collection, only one of the Collections being compared by a SuperTask can have outputs written to it by the same Pipeline that includes the comparison SuperTask. For example, this allows a Pipeline that processes data to also include a SuperTask that compares the results to an existing Collection, but it does not permit different configurations of the same Pipeline to be executed and compared in a single step.
  • The contents of one Collection shall be used to construct the QuantumGraph that defines the full execution plan. This makes it impossible to e.g. process only DatasetRefs for which a Dataset exists in all compared Collections. It also means that the defineQuanta method for comparison SuperTasks should expect to see only one DatasetRef of any set to be compared at this time.

To implement Preflight for a Pipeline containing comparison SuperTasks, then, the activator simply executes the normal Preflight process on one of the Collections to be compared. The activator then walks the resulting QuantumGraph, identifying any DatasetRefs that represent comparisons by some combination of task names and DatasetType names obtainable from the Pipeline. For each of these, it searches for matching DatasetHandles in the other Collections and attaches these to the Quantum as additional inputs. SuperTask execution can then be run as usual.

This adds a few small requirements on the interfaces of some of the classes involved:

DataUnit Updates and Inserts

Todo

Fill these sections in. Make sure to handle DataUnit Joins.

Raw Data Ingest

Making Master Calibrations

Defining SkyMaps