Jeremiah Lowin created AIRFLOW-825: -------------------------------------- Summary: Add Dataflow semantics Key: AIRFLOW-825 URL: https://issues.apache.org/jira/browse/AIRFLOW-825 Project: Apache Airflow Issue Type: Improvement Components: Dataflow Reporter: Jeremiah Lowin Assignee: Jeremiah Lowin
Following discussion on the dev list, this adds first-class Dataflow semantics to Airflow. Please see my PR for examples and unit tests. From the documentation: A Dataflow object represents the result of an upstream task. If the upstream task has multiple outputs contained in a tuple, dict, or other indexable form, an index may be provided so the Dataflow only uses the appropriate output. Dataflows are passed to downstream tasks with a key. This has two effects: 1. It sets up a dependency between the upstream and downstream tasks to ensure that the downstream task does not run before the upstream result is available. 2. It ensures that the [indexed] upstream result is available in the downstream task's context as ``context['dataflows'][key]``. In addition, the result will be passed directly to PythonOperators as a keyword argument. Dataflows use the XCom mechanism to exchange data. Data is passed through the following series of steps: 1. After the upstream task runs, data is passed to the Dataflow object's _set_data() method. 2. The Dataflow's serialize() method is called on the data. This method takes the data object and returns a representation that can be used to reconstruct it later. 3. _set_data() stores the serialized result as an XCom. 4. Before the downstream task runs, it calls the Dataflow _get_data() method. 5. _get_data() retrieves the upstream XCom. 6. The Dataflow's deserialize() method is called. This method takes the serialiezd representation and returns the data object. 7. The data object is passed to the downstream task. The basic Dataflow object has identity serialize and deserialize methods, meaning data is stored directly in the Airflow database. Therefore, for performance and practical reasons, basic Dataflows should not be used with large or complex results. Dataflows can easily be extended to use remote storage. In this case, the serialize method should write the data in to storage and return a URI, which will be stored as an XCom. The URI will be passed to deserialize() so that the data can be downloaded and reconstructed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)