Re-read this a couple times. Some thoughts: • I agree on calling it event based scheduling, may be less misleading. • I believe we can mostly all agree that this is something interesting to add to Airflow and a use case that would be good to support. Nailing the details and path to implement is important though. (or at least that’s my understanding) • In my opinion, I think the best way would be to extend BaseOperator to incorporate signals as suggested before. Adding a new operator seems it will make things more complex for the end user. • We should allow user to configure if they want to use Event based scheduling (async/signal) or operator dependency scheduling (sync/scheduler). This would allow users to choose the mode they want to run their DAG, and potentially also change the execution UI for the DAG. • Still concerned on adding logic to issue signals in user code. • An alternative option here is to use XCom/data lineage artifacts for signal backend. Aka when an operator A pushes a new value of XCom keyed K, execute operator B with that XCom value. Most of the use cases I can think of for even based scheduling can adapt to this concept where downstream operators need to run whenever a new data artifact is generate by an upstream artifact. • Issue here is we don't have good data artifact representation on Airflow as they usually tend to be implicit (over explicit). Hopefully w Functional DAGs we can improve this. • We should also consider how would we implement a use case where its not just a long running task, but just several tasks that run in different schedules and that need a common downstream tasks to be run after either of them finishes. (example: train a new model every time new examples are available or new embeddings are available). This is why I think a more high level design could benefit the proposal. Aka async DAG execution. • UI is going to be affected by the described clear task instance you describe. As a user I will only be able to see the latest task instance and that’s no good. Web UI will need to be changed to accommodate this and we will need to find a way to have multiple task instances of a single task_id + lineage visualization for understanding what event/task instance generated what execution of a task instance.
Gerard Casas Saez Twitter | Cortex | @casassaez On Jun 17, 2020, 10:22 AM -0600, Becket Qin <becket....@gmail.com>, wrote: > Hi Kevin and Ash, > > Thanks for the feedback / comments / suggestions. It is great to know that > Airbnb has already been running stream jobs using AirFlow and there might > be a working solution. > > First of all, I'd like to say that we are still quite new to the AirFlow, > our proposal likely has overlooked many things and that's why we would like > to get advices from the community. So we really appreciate suggestions and > feedback. > > I can't agree more that we should introduce mechanisms to AirFlow instead > of just making it support some specific use cases. The two scenarios > mentioned by Nicholas above are just some examples to explain how > signal-based scheduling enables new possibility. It does not suggest that > our goal is just to make those two cases work. > > Just to add some background to the proposal, the original motivation of > this AIP actually came from our project in Apache Flink community called AI > Flow[1][2]. It aims to define a clear API for people to describe their > machine learning workflows that contain both stream and batch jobs. We have > a basic implementation of signal-based scheduling but would like to switch > to AirFlow so we don't re-invent wheels. > > In any case, it is probably useful to clarify a little bit on the rational > behind the proposal. The current AIP-35 proposes two major changes: > > *Signal based-scheduling* > The key idea is to expand the conditions to trigger a scheduling action, > from job status changes to a more extensible concept of *signal* or *event. > *This concept extension brings two benefits: > > 1. Make AirFlow capable of dealing with richer types of workflows > including long running jobs more smoothly. > Although the two cases mentioned above may be supported with AirFlow as > is, the solutions seem a workaround and are not something designed as the > first class citizen. It essentially relies on the user code to put together > some disjoint workflow submissions to form a complete business logic. > Ideally we would like to see the entire business logic as an integral > workflow. > 2. Allow downstream projects to integrate with AirFlow more easily. > As an example, imagine an operator in a workflow which deploys a model > to an online inference service. In some cases, such model deployment also > requires external approvals which is not a part of the workflow. With a > scheduler taking external signals, it becomes much easier to integrate the > workflow with internal approval systems. > > *Signal notification service* > Signal notification service is also helpful in two aspects: > > 1. It helps isolate the signal sender and signal receivers. > That means an operator who sends the signal does not need to be aware of > who might be listening to this signal and what action have to be taken. One > signal may have multiple listeners and thus multiple workflow actions may > be triggered. > 2. It allows pluggability and again integration with existing systems. > For example, some users may decide to have a Kafka based notification > service. > > We do realize that this proposal is a significant change to AirFlow and > expect taking some time to sort everything out. I am wondering if doing the > following in order would help the discussion. > > 1. First reach consensus on whether this is worth doing or not. > 2. If we think it is worth doing, what would be the ideal final state. > 3. Come up with the implementation plan and migration path from the current > state to the ideal final state. > > What do you think? > > Re: Ash > About some of the questions and suggestions. > > And I'm also not a fan of the name -- perhaps just my background and > > dealing with the scheduler and executor code -- but to me a signal is a > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was described > > as "Event-driven Scheduling". > > > I don't have a strong opinion on the naming. We picked "signal" instead of > "event" simply because that sounds more relevant to "control". But I think > "event" is also a suitable term and it is also intuitive enough. > > In terms of your proposed class layout: > > class BaseOperator(SignalOperator, LoggingMixin): > > Don't create the extra class, just put new things on BaseOperator. > > > Tao raised a similar question in the google doc. I agree that it would work > by putting new things on the BaseOperator. I think one reason of having a > separate SignalOperator class is that theoretically speaking, the scheduler > only needs to deal with the SignalOperator interface. If so, we may want to > eventually migrate from the current BaseOperator to SignalOperator. The job > status change will just become one type of the signals. So the new > operators might be able to just implement a SignalOperator instead of > BaseOperator. Having a separate SignalOperator would make it easier to > eventually converge to SignalOperator and deprecate the current > BaseOperator. > > Doesn't this require custom code inside the streaming job to report > > events back to Airflow? > > > For the job status change events / signals, users don't have to do > anything. It is just currently the job status are communicated back the > scheduler via Database, now it is going to be via the signal notification > service. > For the signals that is not job status related, user code needs to send a > signal / event at some point. > > 3. How do we notice if that long running task dies without a trace -- > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and > > never signals that it errored how does Airflow know? > > > In our basic implementation, we have a job status listener component that > can work with K8S / Yarn / Other cloud platforms, to keep track of the job > status change. And the job status change signals are actually sent by it. > Does AirFlow right now also listen to the status of the jobs running in > K8S? Can you help us understand a bit more on the current failure handling > model of AirFlow? > > 4. You're new key component of th Signal Notification Service is not > > clearly defined. > > Where does it store things? > > Does it need persistent storage, or is in-memory good enough? > > Does it need at-most-once delivery semantics? At-least-once delivery? > > Does the scheduler pull from it, or does signal service push to the > > scheduler. > > What happens when there are more than one scheduler running (AIP-15, > > which I am working on right now) > > How do we run more than one notification service component (i.e. HA -- > > once the scheduler is HA we can't have any other new components being a > > single point of failure) > > Do signals get delivered to just one scheduler, or all schedulers? > > > Sorry about the confusion. It is true that a bunch of details are missed > here. We will add them into the wiki. To answer the questions quickly: > > - In our basic implementation, default signal notification service > stores the signal in my sql. > - The notification service has a REST API works in a long-pull manner, > i.e. a client sends an http request with the last version of the keys it > has seen, and that request parking waits on the server side until one of > the following is met: > - Request timeout is reached, in which case an empty response is > returned. > - A new version of the interested signal is available. The > notification service returns all the SignalVersions newer than > the version > last seen by the client. > - The scheduler pulls from the notification service, but in a long-pull > manner. > - Off the top of my head, the notification service probably just needs > one instance which supports multiple schedulers. It could have more > instances for scalability. > - Technically speaking the notification service frontend itself is > stateless. So failure recovery is simple. So if the database of the > notification service has HA. It should be sufficient. > - Signals will be delivered to all the schedulers that are listening to > that signal. > > 5. Does this now mean to get started out with Airflow you need to run > > three components: webserver, scheduler and signal service? > > > Not sure if there is a problem, but I imagine for a local deployment, a > signal service could be a part of the webserver, just with one more REST > endpoint. > > Thanks again for all the great feedback and looking forward to your > suggestions. > > Cheers, > > Jiangjie (Becket) Qin > > [1] https://www.youtube.com/watch?v=xiYJTCj2zUU > [2] https://www.slideshare.net/JiangjieQin/ai-made-easy-with-flink-ai-flow > > > On Wed, Jun 17, 2020 at 6:16 PM Ash Berlin-Taylor <a...@apache.org> wrote: > > > I also agree with Kevin - I'd love to see better streaming support in > > Airflow, but I'm not sure this is the way to go about it. Something > > about it feels not quite right. > > > > And I'm also not a fan of the name -- perhaps just my background and > > dealing with the scheduler and executor code -- but to me a signal is a > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was described > > as "Event-driven Scheduling". > > > > > > To take your example: > > > > > For example, in machine learning, an online > > > learning job is a streaming job running forever. It emits a model > > > periodically and a batch evaluate job should run to validate each model > > > > This is possible right now in Airflow by making an API request to > > trigger a run of the validation DAG. > > > > In terms of your proposed class layout: > > > > class BaseOperator(SignalOperator, LoggingMixin): > > > > Don't create the extra class, just put new things on BaseOperator. > > > > As Chris said: > > > > > Are you saying that you actually have tasks in Airflow that are > > > intended to run indefinitely? > > > > 1. How is Airflow meant to handle that task? Right now it runs it _and > > waits for completion_. It's not clear you've actually thought through > > everything this change would involve at the code level -- specifically > > in the Executors. > > > > 2. Doesn't this require custom code inside the streaming job to report > > events back to Airflow? > > > > I am very uneasy about _needing_ to make changes in the "task code" to > > support running it under Airflow. > > > > One of the big plus points to me is that right now with Airflow can > > run whatever tasks or jobs you like, without your job code needing to > > know or care about Airflow. This approach changes that. And this is not > > a decision to be made lightly. > > > > 3. How do we notice if that long running task dies without a trace -- > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and > > never signals that it errored how does Airflow know? > > > > 4. You're new key component of th Signal Notification Service is not > > clearly defined. > > > > Where does it store things? > > > > Does it need persistent storage, or is in-memory good enough? > > > > Does it need at-most-once delivery semantics? At-least-once delivery? > > > > Does the scheduler pull from it, or does signal service push to the > > scheduler. > > > > What happens when there are more than one scheduler running (AIP-15, > > which I am working on right now) > > > > How do we run more than one notification service component (i.e. HA -- > > once the scheduler is HA we can't have any other new components being a > > single point of failure) > > > > Do signals get delivered to just one scheduler, or all schedulers? > > > > 5. Does this now mean to get started out with Airflow you need to run > > three components: webserver, scheduler and signal service? > > > > (Although the performance on a laptop doesn't matter _too_ much I am > > concerned that we keep the "getting started" experience as easy as > > possible for new users) > > > > In short I'd like to see _a lot_ more detail on this proposal. It's > > proposing a fundamental change to Airflow and there are still lots of > > things not covered in enough detail. > > > > I'm also not sure it's even needed for your example workflow. For > > instance one way of dealing with it right now would be: > > > > - A task to start a streaming job. This is essentially fire-and-forget. > > - A dag with schedule_interval=None that is "manually" invoked (by API > > request from within your streaming code) with these two tasks: > > > > model build task >> model validation task >> streaming "restart" task > > > > i.e. this manually triggerd dag does the job of your signals proposal > > if I've understood your example correctly. I'm not saying this is a > > _good_ way of doing it, just A way. Have I correctly understood your > > example? > > > > > > -ash > > > > > > > > On Jun 17 2020, at 10:21 am, 蒋晓峰 <thanosxnicho...@gmail.com> wrote: > > > > > Hi Gerard, > > > > > > If users follow the definition of SignalOperator correctly, the idea for > > > the streaming-triggered-batch case is to restart the execution for > > > evaluations of the online trained model. In other words, once the > > > evaluation operator receives the signal from the online learning > > operator, > > > the scheduler takes RESTART action to restart the task of the > > > evaluation on > > > the indefinitely running DAGRun. Upon a new signal is received, the > > > scheduler checks with the SignalOperators to determine the action of > > > Operator and carries out that action. Then, this scheduling also updates > > > the state of the task instance, not clear the operator's state in the > > > metadata database. At this time, the scheduler cuts out the subgraph from > > > the DAG and constructs the sub DAG to run. > > > > > > While keeping a history of operator execution, the scheduling could use > > > signals and execution history to organize signal conditions of the > > > operators. Once the operator of the evaluation receives a new signal, the > > > scheduler checks whether the signal and previous executions are met with > > > the conditions. If the received signal is met with the corresponding > > > condition and the execution of task instances also meet the > > > conditions, the > > > scheduler would take the RESTART action repeatedly for model evaluation. > > > > > > For supporting signal based triggers, the signal-based scheduling > > requires > > > the introduction of SignalOperator for the operators that send signals. > > > With the SignalOperator, the scheduler could switch to use signals of the > > > operator and the corresponding condition to decide the action of the > > > operator. No matter whether the relationship between upstream tasks to > > > downstream tasks is one-to-one or one-to-many, this scheduling only > > checks > > > whether the received signals of the SignalOperator is met with the signal > > > conditions and concerns about which action to take when these conditions > > > are met. > > > > > > By the way, signal-based scheduling needs a few more things to map > > signals > > > to corresponding actions. > > > > > > Regards, > > > Nicholas Jiang > > > > > > On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez > > > <gcasass...@twitter.com.invalid> wrote: > > > > > > > That looks interesting. My main worry is how to handle multiple > > executions > > > > of Signal based operators. If I follow your definition correctly, the > > idea > > > > is to run multiple evaluations of the online trained model (on a > > > > permanently running DAGRun). So what happens when you have triggered the > > > > downstream operators using a signal once? Do you clear state of the > > > > operators once a new signal is generated? Do you generate a new DAGRun? > > > > > > > > How do you evaluate the model several times while keeping a history of > > > > operator execution? > > > > > > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which > > > > basically describe something similar to what you are proposing in > > > > this AIP: > > > > https://github.com/tensorflow/community/pull/253 (interesting read as > > > > it’s also related to the ML field). > > > > > > > > My main concern would be that you may need to change a few more > > > > things to > > > > support signal based triggers, as the relationship between upstream > > tasks > > > > to downstream tasks is no longer 1:1 but 1:many. > > > > > > > > Gerard Casas Saez > > > > Twitter | Cortex | @casassaez > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, wrote: > > > > > Hello everyone, > > > > > > > > > > Sending a message to everyone and collecting feedback on the AIP-35 on > > > > > adding signal-based scheduling. This was previously briefly > > > > mentioned in > > > > > the discussion of development slack channel. The key motivation of > > this > > > > > proposal is to support a mixture of batch and stream jobs in the same > > > > > workflow. > > > > > > > > > > In practice, there are many business logics that need collaboration > > > > between > > > > > streaming and batch jobs. For example, in machine learning, an online > > > > > learning job is a streaming job running forever. It emits a model > > > > > periodically and a batch evaluate job should run to validate each > > model. > > > > So > > > > > this is a streaming-triggered-batch case. If we continue from the > > above > > > > > example, once the model passes the validation, the model needs to be > > > > > deployed to a serving job. That serving job could be a streaming > > > > job that > > > > > keeps polling records from Kafka and uses a model to do prediction. > > And > > > > the > > > > > availability of a new model requires that streaming prediction job > > either > > > > > to restart, or to reload the new model on the fly. In either case, > > > > it is > > > > a > > > > > batch-to-streaming job triggering. > > > > > > > > > > At this point above, people basically have to invent their own way to > > > > deal > > > > > with such workflows consisting of both batch and streaming jobs. I > > think > > > > > having a system that can help smoothly work with a mixture of > > streaming > > > > and > > > > > batch jobs is valuable here. > > > > > > > > > > This AIP-35 focuses on signal-based scheduling to let the operators > > send > > > > > signals to the scheduler to trigger a scheduling action, such as > > starting > > > > > jobs, stopping jobs and restarting jobs. With the change of > > > > > scheduling mechanism, a streaming job can send signals to the > > scheduler > > > > to > > > > > indicate the state change of the dependencies. > > > > > > > > > > Signal-based scheduling allows the scheduler to know the change of the > > > > > dependency state immediately without periodically querying the > > metadata > > > > > database. This also allows potential support for richer scheduling > > > > > semantics such as periodic execution and manual trigger at per > > operator > > > > > granularity. > > > > > > > > > > Changes proposed: > > > > > > > > > > - Signals are used to define conditions that must be met to run an > > > > > operator. State change of the upstream tasks is one type of the > > signals. > > > > > There may be other types of signals. The scheduler may take different > > > > > actions when receiving different signals. To let the operators take > > > > signals > > > > > as their starting condition, we propose to introduce SignalOperator > > which > > > > > is mentioned in the public interface section. > > > > > - A notification service is necessary to help receive and propagate > > the > > > > > signals from the operators and other sources to the scheduler. Upon > > > > > receiving a signal, the scheduler can take action according to the > > > > > predefined signal-based conditions on the operators. Therefore we > > propose > > > > > to introduce a Signal Notification Service component to Airflow. > > > > > > > > > > Please see related documents and PRs for details: > > > > > > > > > > AIP: > > > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow > > > > > Issue: https://github.com/apache/airflow/issues/9321 > > > > > > > > > > Please let me know if there are any aspects that you agree/disagree > > with > > > > or > > > > > need more clarification (especially the SignalOperator and > > > > SignalService). > > > > > Any comments are welcome and I am looking forward to it! > > > > > > > > > > Thanks, > > > > > Nicholas > > > > > > > > >