Hi Chris, Yes, We could have the scenario that the DagRun contains the streaming task instances which run forever. Airflow currently focuses on batch tasks scheduling and assumes the tasks in DAG could be completed. Only the existing mechanism doesn't support tasks intended to run indefinitely. But with the signal based scheduling, a streaming job could send signals to the scheduler to indicate the state change of the dependencies. By this way, the scheduling of Airflow could support a mix of streaming and batch jobs.
Thanks, Nicholas On Wed, Jun 17, 2020 at 1:38 AM Chris Palmer <ch...@crpalmer.com> wrote: > Nicholas, > > Are you saying that you actually have tasks in Airflow that are intended to > run indefinitely? > > That in of itself seems to be a huge fundamental departure from many of the > assumptions built into Airflow. > > Chris > > On Tue, Jun 16, 2020 at 12:00 PM Gerard Casas Saez > <gcasass...@twitter.com.invalid> wrote: > > > That looks interesting. My main worry is how to handle multiple > executions > > of Signal based operators. If I follow your definition correctly, the > idea > > is to run multiple evaluations of the online trained model (on a > > permanently running DAGRun). So what happens when you have triggered the > > downstream operators using a signal once? Do you clear state of the > > operators once a new signal is generated? Do you generate a new DAGRun? > > > > How do you evaluate the model several times while keeping a history of > > operator execution? > > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which > > basically describe something similar to what you are proposing in this > AIP: > > https://github.com/tensorflow/community/pull/253 (interesting read as > > it’s also related to the ML field). > > > > My main concern would be that you may need to change a few more things to > > support signal based triggers, as the relationship between upstream tasks > > to downstream tasks is no longer 1:1 but 1:many. > > > > Gerard Casas Saez > > Twitter | Cortex | @casassaez > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, wrote: > > > Hello everyone, > > > > > > Sending a message to everyone and collecting feedback on the AIP-35 on > > > adding signal-based scheduling. This was previously briefly mentioned > in > > > the discussion of development slack channel. The key motivation of this > > > proposal is to support a mixture of batch and stream jobs in the same > > > workflow. > > > > > > In practice, there are many business logics that need collaboration > > between > > > streaming and batch jobs. For example, in machine learning, an online > > > learning job is a streaming job running forever. It emits a model > > > periodically and a batch evaluate job should run to validate each > model. > > So > > > this is a streaming-triggered-batch case. If we continue from the above > > > example, once the model passes the validation, the model needs to be > > > deployed to a serving job. That serving job could be a streaming job > that > > > keeps polling records from Kafka and uses a model to do prediction. And > > the > > > availability of a new model requires that streaming prediction job > either > > > to restart, or to reload the new model on the fly. In either case, it > is > > a > > > batch-to-streaming job triggering. > > > > > > At this point above, people basically have to invent their own way to > > deal > > > with such workflows consisting of both batch and streaming jobs. I > think > > > having a system that can help smoothly work with a mixture of streaming > > and > > > batch jobs is valuable here. > > > > > > This AIP-35 focuses on signal-based scheduling to let the operators > send > > > signals to the scheduler to trigger a scheduling action, such as > starting > > > jobs, stopping jobs and restarting jobs. With the change of > > > scheduling mechanism, a streaming job can send signals to the scheduler > > to > > > indicate the state change of the dependencies. > > > > > > Signal-based scheduling allows the scheduler to know the change of the > > > dependency state immediately without periodically querying the metadata > > > database. This also allows potential support for richer scheduling > > > semantics such as periodic execution and manual trigger at per operator > > > granularity. > > > > > > Changes proposed: > > > > > > - Signals are used to define conditions that must be met to run an > > > operator. State change of the upstream tasks is one type of the > signals. > > > There may be other types of signals. The scheduler may take different > > > actions when receiving different signals. To let the operators take > > signals > > > as their starting condition, we propose to introduce SignalOperator > which > > > is mentioned in the public interface section. > > > - A notification service is necessary to help receive and propagate the > > > signals from the operators and other sources to the scheduler. Upon > > > receiving a signal, the scheduler can take action according to the > > > predefined signal-based conditions on the operators. Therefore we > propose > > > to introduce a Signal Notification Service component to Airflow. > > > > > > Please see related documents and PRs for details: > > > > > > AIP: > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow > > > Issue: https://github.com/apache/airflow/issues/9321 > > > > > > Please let me know if there are any aspects that you agree/disagree > with > > or > > > need more clarification (especially the SignalOperator and > > SignalService). > > > Any comments are welcome and I am looking forward to it! > > > > > > Thanks, > > > Nicholas > > >