Fellow Airflowers, I am following up on some of the proposed changes in the Airflow 3 proposal <https://docs.google.com/document/d/1MTr53101EISZaYidCUKcR6mRKshXGzW6DZFXGzetG3E/>, where more information was requested by the community.
One specific topic was "Interactive DAG runs". This is not yet a full fledged AIP, but is intended to facilitate a structured discussion, which will then be followed up with a formal AIP. I have included most of the text here, but please give detailed feedback in the attached document <https://docs.google.com/document/d/1CXis3Ovmxm3pt4moiBIpsajX7GmRpGbcEfwt_blkSLI>, to enable a structured discussion around specific points where more detail is required. --- Synchronous DAG Execution use cases Airflow historically has been used for “batch” orchestration and this has been both a strength and a limitation. But, even as part of Airflow’s early days, the concept of “triggering” a DAG run through the UI for debugging has been a core part of the value proposition. This concept of triggering a DAG run was also added to the Airflow API, so that DAGs could be invoked programmatically through the API, instead of being only based on a schedule. As part of Data Driven Scheduling, introduced in Airflow 2.4, DAG invocation progressed further to support triggering of DAGs based on Dataset update events. This was further enhanced in Airflow 2.9, with the introduction of API support for Dataset Update events, so that Dataset updates outside of Airflow could be used to trigger Dependent data pipelines. More recently, there have been many requests for Synchronous DAG Execution with Airflow. Some of the use cases are detailed below: Data-driven Applications Many Enterprise applications have become data-driven applications, which provide a response to a customer based on personalized information. For example, a hotel reservation experience for a customer could include personalized options based on prior customer experience, which could be aggregated across multiple data systems and obtained during the customer interaction experience. Data aggregation Bots Agents being developed based on AI are commonly being used to aggregate a summary or a review based on prior trained knowledge of the domain. Generative AI With the advent of Generative AI, a common use case for DAG execution is for inference. Using the AskAstro LLM <https://github.com/astronomer/ask-astro>application as an example, the steps needed for responding to a question are on the lines of: 1. Rephrase the question using an LLM (multiple times) 2. Submit multiple versions of the question (original and re-phrased versions) to an LLM 3. De-duplicate the results 4. Optionally verify the results including the associated references for the results 5. Return the answer to the question The above steps can easily be mapped to a Directed Acyclic Graph. With Airflow, this could be represented as a DAG with (2) and (4) as Dynamically Mapped Tasks. However, to cleanly support the above DAG execution for Inference, the final step (5) needs to be supported in Airflow as a “return the result”. Proposal The proposal here has multiple parts as detailed below. 1. Enable a DAG to be run at the same time by one or many users, possibly with different parameters, without requiring a unique logical date for each DAGrun. This definitely builds on the separation between “logical date” and “data interval”, which has been incrementally been worked on since Airflow 2.2. Another way of saying this is to support “non-data-interval DAG runs”. This particular feature has been requested often including for Hyper parameter tuning as part of Machine Learning. (This is not proposing that the API server itself runs the DAG) 2. Add language support within the DAG to return the result in Pythonic form either as a value or a reference. In practice, this could mean the designation of a single result task in a DAG. Therefore, the API invocation for Synchronous DAG Execution would be similar to the “Trigger DAG” invocation, but would wait for a response from DAG execution, which could be returned to the user. The reference returned could be a dataset or to a blob in object storage. An initial approach for this could be: - Structure the existing Trigger API to return a "job id", and - Add a new “poll / wait for completion” API which can be invoked using the above “job id”. - Thereby enabling reliable handling of long running jobs. 3. Return handling It’s critical for a DAG to return a status, even if it is a failure. In the current DAG execution model, there is no completion event in the case of a task failure, with the exception of a “Teardown” event. With synchronous execution, a DAG must always: - Return the result to the invoking API as soon as the result is available (from the result task), without having to wait for the teardown task (if any) to complete (this is the success case), and - Return the failure status to the invoking API as a key task in the DAG has failed (inc upstream_failed etc.), rather than waiting for DagRun completion (i.e waiting for teardown task completion). --- Best regards, Vikram Koka, Ash Berlin-Taylor, Kaxil Naik, and Constance Martineau