How is the UI going to represent this? How is do you navigate between them?
There is already the concept of try_number -- can that be used instead of making task_id to task_ids? On Jun 17 2020, at 11:24 am, 蒋晓峰 <thanosxnicho...@gmail.com> wrote: > Hi Gerard, > > Regarding the question mentioned above, it's a good point. The Operator > currently contains a task_id attribute which is the same as operator id. > Therefore, when a task instance runs multiple times, the task_id would be > the same. So we do need to change something more than what is in the > current proposal. > > To solve the problem, we need to change task_id to task_ids attribute for > multiple executions. In signal-based scheduling, multiple executions of > operators need to use task_ids attribute to maintain the task instance list > corresponding to the same DagRun. Once the downstream operator > receives the > signal from upstream, the scheduling always creates a new task > instance. At > this time, the Operator should append the new task instance id to > the task_ids. > > Regards, > Nicholas Jiang > > On Wed, Jun 17, 2020 at 2:55 PM Kevin Yang <yrql...@gmail.com> wrote: > >> I'm in general supportive of this idea of supporting streaming jobs. >> We in >> Airbnb have historically ran stream jobs for years on Airflow, with some >> hacks of course. Yes the stream jobs might not be idempotent or so to fit >> in the Airflow paradigm. But I personally would love to see Airflow be >> expanded to support more use cases and be the go to workflow orchestration >> solution for companies. >> >> With that said, I would prefer we carefully build a paved path for stream >> job scheduling in Airflow instead of a solution for one particular use >> case. Besides different scheduling model requirements, stream jobs >> may need >> more, e.g. much shorter scheduling latency and singleton guarantee. AIP-35 >> aims to solve the issue that downstream may need to be kicked off >> even the >> upstream job is still running. This is indeed a solid problem but my first >> impression is that adding an entire signal notification system with a >> metadata DB and touching the core scheduling logic for it might be a bit >> overkill. Maybe it would be enough if the upstream still sends out a >> signal, and we build a hacky operator that runs together with the upstream >> task but performs no op until its listener got a start signal from the >> upstream task. >> >> Just my two cents with hope to get more ideas on this topic. >> >> >> Cheers, >> Kevin Y >> >> On Tue, Jun 16, 2020 at 11:06 PM 蒋晓峰 <thanosxnicho...@gmail.com> wrote: >> >> > Hi barath, >> > >> > Could you please provide the permalink of your previous discussion >> about >> > trigger based dag runs? >> > >> > In my opinion, compared with trigger based dag runs, signal based >> > scheduling provides finer-grained triggers to determine whether to run >> the >> > task instances on the Operator or not. External systems outside of >> Airflow >> > could use SignalService to send signals to the scheduler to >> indicate the >> > state change of the operator's dependencies. It's indeed useful for >> > external systems outside of Airflow. >> > >> > Regards, >> > Nicholas Jiang >> > >> > On Wed, Jun 17, 2020 at 2:41 AM bharath palaksha <bharath...@gmail.com> >> > wrote: >> > >> > > I had a started a similar discussion earlier. >> > > Trigger based dag runs, a sensor instead of a cron expression which >> tells >> > > whether to trigger the dag run or not. This is similar to that. >> > > It is very useful when you have external systems outside of >> Airflow and >> > > which can't be programmed to use REST API >> > > >> > > On Tue, Jun 16, 2020, 11:08 PM Chris Palmer <ch...@crpalmer.com> >> wrote: >> > > >> > > > Nicholas, >> > > > >> > > > Are you saying that you actually have tasks in Airflow that are >> > intended >> > > to >> > > > run indefinitely? >> > > > >> > > > That in of itself seems to be a huge fundamental departure from many >> of >> > > the >> > > > assumptions built into Airflow. >> > > > >> > > > Chris >> > > > >> > > > On Tue, Jun 16, 2020 at 12:00 PM Gerard Casas Saez >> > > > <gcasass...@twitter.com.invalid> wrote: >> > > > >> > > > > That looks interesting. My main worry is how to handle multiple >> > > > executions >> > > > > of Signal based operators. If I follow your definition correctly, >> the >> > > > idea >> > > > > is to run multiple evaluations of the online trained model >> (on a >> > > > > permanently running DAGRun). So what happens when you have >> triggered >> > > the >> > > > > downstream operators using a signal once? Do you clear state >> of the >> > > > > operators once a new signal is generated? Do you generate a new >> > DAGRun? >> > > > > >> > > > > How do you evaluate the model several times while keeping a history >> > of >> > > > > operator execution? >> > > > > >> > > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which >> > > > > basically describe something similar to what you are >> proposing in >> > this >> > > > AIP: >> > > > > https://github.com/tensorflow/community/pull/253 (interesting read >> > as >> > > > > it’s also related to the ML field). >> > > > > >> > > > > My main concern would be that you may need to change a few more >> > things >> > > to >> > > > > support signal based triggers, as the relationship between upstream >> > > tasks >> > > > > to downstream tasks is no longer 1:1 but 1:many. >> > > > > >> > > > > Gerard Casas Saez >> > > > > Twitter | Cortex | @casassaez >> > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, >> > > wrote: >> > > > > > Hello everyone, >> > > > > > >> > > > > > Sending a message to everyone and collecting feedback on the >> AIP-35 >> > > on >> > > > > > adding signal-based scheduling. This was previously briefly >> > mentioned >> > > > in >> > > > > > the discussion of development slack channel. The key motivation >> of >> > > this >> > > > > > proposal is to support a mixture of batch and stream jobs >> in the >> > same >> > > > > > workflow. >> > > > > > >> > > > > > In practice, there are many business logics that need >> collaboration >> > > > > between >> > > > > > streaming and batch jobs. For example, in machine learning, an >> > online >> > > > > > learning job is a streaming job running forever. It emits a model >> > > > > > periodically and a batch evaluate job should run to >> validate each >> > > > model. >> > > > > So >> > > > > > this is a streaming-triggered-batch case. If we continue >> from the >> > > above >> > > > > > example, once the model passes the validation, the model >> needs to >> > be >> > > > > > deployed to a serving job. That serving job could be a streaming >> > job >> > > > that >> > > > > > keeps polling records from Kafka and uses a model to do >> prediction. >> > > And >> > > > > the >> > > > > > availability of a new model requires that streaming prediction >> job >> > > > either >> > > > > > to restart, or to reload the new model on the fly. In either >> case, >> > it >> > > > is >> > > > > a >> > > > > > batch-to-streaming job triggering. >> > > > > > >> > > > > > At this point above, people basically have to invent their own >> way >> > to >> > > > > deal >> > > > > > with such workflows consisting of both batch and streaming jobs. >> I >> > > > think >> > > > > > having a system that can help smoothly work with a mixture of >> > > streaming >> > > > > and >> > > > > > batch jobs is valuable here. >> > > > > > >> > > > > > This AIP-35 focuses on signal-based scheduling to let the >> operators >> > > > send >> > > > > > signals to the scheduler to trigger a scheduling action, >> such as >> > > > starting >> > > > > > jobs, stopping jobs and restarting jobs. With the change of >> > > > > > scheduling mechanism, a streaming job can send signals to the >> > > scheduler >> > > > > to >> > > > > > indicate the state change of the dependencies. >> > > > > > >> > > > > > Signal-based scheduling allows the scheduler to know the change >> of >> > > the >> > > > > > dependency state immediately without periodically querying the >> > > metadata >> > > > > > database. This also allows potential support for richer >> scheduling >> > > > > > semantics such as periodic execution and manual trigger at per >> > > operator >> > > > > > granularity. >> > > > > > >> > > > > > Changes proposed: >> > > > > > >> > > > > > - Signals are used to define conditions that must be met to run >> an >> > > > > > operator. State change of the upstream tasks is one type of the >> > > > signals. >> > > > > > There may be other types of signals. The scheduler may take >> > different >> > > > > > actions when receiving different signals. To let the operators >> take >> > > > > signals >> > > > > > as their starting condition, we propose to introduce >> SignalOperator >> > > > which >> > > > > > is mentioned in the public interface section. >> > > > > > - A notification service is necessary to help receive and >> propagate >> > > the >> > > > > > signals from the operators and other sources to the scheduler. >> Upon >> > > > > > receiving a signal, the scheduler can take action according to >> the >> > > > > > predefined signal-based conditions on the operators. >> Therefore we >> > > > propose >> > > > > > to introduce a Signal Notification Service component to Airflow. >> > > > > > >> > > > > > Please see related documents and PRs for details: >> > > > > > >> > > > > > AIP: >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow >> > > > > > Issue: https://github.com/apache/airflow/issues/9321 >> > > > > > >> > > > > > Please let me know if there are any aspects that you >> agree/disagree >> > > > with >> > > > > or >> > > > > > need more clarification (especially the SignalOperator and >> > > > > SignalService). >> > > > > > Any comments are welcome and I am looking forward to it! >> > > > > > >> > > > > > Thanks, >> > > > > > Nicholas >> > > > > >> > > > >> > > >> > >> >