Hi Gerard, Regarding the question mentioned above, it's a good point. The Operator currently contains a task_id attribute which is the same as operator id. Therefore, when a task instance runs multiple times, the task_id would be the same. So we do need to change something more than what is in the current proposal.
To solve the problem, we need to change task_id to task_ids attribute for multiple executions. In signal-based scheduling, multiple executions of operators need to use task_ids attribute to maintain the task instance list corresponding to the same DagRun. Once the downstream operator receives the signal from upstream, the scheduling always creates a new task instance. At this time, the Operator should append the new task instance id to the task_ids. Regards, Nicholas Jiang On Wed, Jun 17, 2020 at 2:55 PM Kevin Yang <yrql...@gmail.com> wrote: > I'm in general supportive of this idea of supporting streaming jobs. We in > Airbnb have historically ran stream jobs for years on Airflow, with some > hacks of course. Yes the stream jobs might not be idempotent or so to fit > in the Airflow paradigm. But I personally would love to see Airflow be > expanded to support more use cases and be the go to workflow orchestration > solution for companies. > > With that said, I would prefer we carefully build a paved path for stream > job scheduling in Airflow instead of a solution for one particular use > case. Besides different scheduling model requirements, stream jobs may need > more, e.g. much shorter scheduling latency and singleton guarantee. AIP-35 > aims to solve the issue that downstream may need to be kicked off even the > upstream job is still running. This is indeed a solid problem but my first > impression is that adding an entire signal notification system with a > metadata DB and touching the core scheduling logic for it might be a bit > overkill. Maybe it would be enough if the upstream still sends out a > signal, and we build a hacky operator that runs together with the upstream > task but performs no op until its listener got a start signal from the > upstream task. > > Just my two cents with hope to get more ideas on this topic. > > > Cheers, > Kevin Y > > On Tue, Jun 16, 2020 at 11:06 PM 蒋晓峰 <thanosxnicho...@gmail.com> wrote: > > > Hi barath, > > > > Could you please provide the permalink of your previous discussion > about > > trigger based dag runs? > > > > In my opinion, compared with trigger based dag runs, signal based > > scheduling provides finer-grained triggers to determine whether to run > the > > task instances on the Operator or not. External systems outside of > Airflow > > could use SignalService to send signals to the scheduler to indicate the > > state change of the operator's dependencies. It's indeed useful for > > external systems outside of Airflow. > > > > Regards, > > Nicholas Jiang > > > > On Wed, Jun 17, 2020 at 2:41 AM bharath palaksha <bharath...@gmail.com> > > wrote: > > > > > I had a started a similar discussion earlier. > > > Trigger based dag runs, a sensor instead of a cron expression which > tells > > > whether to trigger the dag run or not. This is similar to that. > > > It is very useful when you have external systems outside of Airflow and > > > which can't be programmed to use REST API > > > > > > On Tue, Jun 16, 2020, 11:08 PM Chris Palmer <ch...@crpalmer.com> > wrote: > > > > > > > Nicholas, > > > > > > > > Are you saying that you actually have tasks in Airflow that are > > intended > > > to > > > > run indefinitely? > > > > > > > > That in of itself seems to be a huge fundamental departure from many > of > > > the > > > > assumptions built into Airflow. > > > > > > > > Chris > > > > > > > > On Tue, Jun 16, 2020 at 12:00 PM Gerard Casas Saez > > > > <gcasass...@twitter.com.invalid> wrote: > > > > > > > > > That looks interesting. My main worry is how to handle multiple > > > > executions > > > > > of Signal based operators. If I follow your definition correctly, > the > > > > idea > > > > > is to run multiple evaluations of the online trained model (on a > > > > > permanently running DAGRun). So what happens when you have > triggered > > > the > > > > > downstream operators using a signal once? Do you clear state of the > > > > > operators once a new signal is generated? Do you generate a new > > DAGRun? > > > > > > > > > > How do you evaluate the model several times while keeping a history > > of > > > > > operator execution? > > > > > > > > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which > > > > > basically describe something similar to what you are proposing in > > this > > > > AIP: > > > > > https://github.com/tensorflow/community/pull/253 (interesting read > > as > > > > > it’s also related to the ML field). > > > > > > > > > > My main concern would be that you may need to change a few more > > things > > > to > > > > > support signal based triggers, as the relationship between upstream > > > tasks > > > > > to downstream tasks is no longer 1:1 but 1:many. > > > > > > > > > > Gerard Casas Saez > > > > > Twitter | Cortex | @casassaez > > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, > > > wrote: > > > > > > Hello everyone, > > > > > > > > > > > > Sending a message to everyone and collecting feedback on the > AIP-35 > > > on > > > > > > adding signal-based scheduling. This was previously briefly > > mentioned > > > > in > > > > > > the discussion of development slack channel. The key motivation > of > > > this > > > > > > proposal is to support a mixture of batch and stream jobs in the > > same > > > > > > workflow. > > > > > > > > > > > > In practice, there are many business logics that need > collaboration > > > > > between > > > > > > streaming and batch jobs. For example, in machine learning, an > > online > > > > > > learning job is a streaming job running forever. It emits a model > > > > > > periodically and a batch evaluate job should run to validate each > > > > model. > > > > > So > > > > > > this is a streaming-triggered-batch case. If we continue from the > > > above > > > > > > example, once the model passes the validation, the model needs to > > be > > > > > > deployed to a serving job. That serving job could be a streaming > > job > > > > that > > > > > > keeps polling records from Kafka and uses a model to do > prediction. > > > And > > > > > the > > > > > > availability of a new model requires that streaming prediction > job > > > > either > > > > > > to restart, or to reload the new model on the fly. In either > case, > > it > > > > is > > > > > a > > > > > > batch-to-streaming job triggering. > > > > > > > > > > > > At this point above, people basically have to invent their own > way > > to > > > > > deal > > > > > > with such workflows consisting of both batch and streaming jobs. > I > > > > think > > > > > > having a system that can help smoothly work with a mixture of > > > streaming > > > > > and > > > > > > batch jobs is valuable here. > > > > > > > > > > > > This AIP-35 focuses on signal-based scheduling to let the > operators > > > > send > > > > > > signals to the scheduler to trigger a scheduling action, such as > > > > starting > > > > > > jobs, stopping jobs and restarting jobs. With the change of > > > > > > scheduling mechanism, a streaming job can send signals to the > > > scheduler > > > > > to > > > > > > indicate the state change of the dependencies. > > > > > > > > > > > > Signal-based scheduling allows the scheduler to know the change > of > > > the > > > > > > dependency state immediately without periodically querying the > > > metadata > > > > > > database. This also allows potential support for richer > scheduling > > > > > > semantics such as periodic execution and manual trigger at per > > > operator > > > > > > granularity. > > > > > > > > > > > > Changes proposed: > > > > > > > > > > > > - Signals are used to define conditions that must be met to run > an > > > > > > operator. State change of the upstream tasks is one type of the > > > > signals. > > > > > > There may be other types of signals. The scheduler may take > > different > > > > > > actions when receiving different signals. To let the operators > take > > > > > signals > > > > > > as their starting condition, we propose to introduce > SignalOperator > > > > which > > > > > > is mentioned in the public interface section. > > > > > > - A notification service is necessary to help receive and > propagate > > > the > > > > > > signals from the operators and other sources to the scheduler. > Upon > > > > > > receiving a signal, the scheduler can take action according to > the > > > > > > predefined signal-based conditions on the operators. Therefore we > > > > propose > > > > > > to introduce a Signal Notification Service component to Airflow. > > > > > > > > > > > > Please see related documents and PRs for details: > > > > > > > > > > > > AIP: > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow > > > > > > Issue: https://github.com/apache/airflow/issues/9321 > > > > > > > > > > > > Please let me know if there are any aspects that you > agree/disagree > > > > with > > > > > or > > > > > > need more clarification (especially the SignalOperator and > > > > > SignalService). > > > > > > Any comments are welcome and I am looking forward to it! > > > > > > > > > > > > Thanks, > > > > > > Nicholas > > > > > > > > > > > > > > >