Re-read this a couple times.  Some thoughts:

• I agree on calling it event based scheduling, may be less misleading.
• I believe we can mostly all agree that this is something interesting to add 
to Airflow and a use case that would be good to support. Nailing the details 
and path to implement is important though. (or at least that’s my understanding)
• In my opinion, I think the best way would be to extend BaseOperator to 
incorporate signals as suggested before. Adding a new operator seems it will 
make things more complex for the end user.
    • We should allow user to configure if they want to use Event based 
scheduling (async/signal) or operator dependency scheduling (sync/scheduler). 
This would allow users to choose the mode they want to run their DAG, and 
potentially also change the execution UI for the DAG.
• Still concerned on adding logic to issue signals in user code.
    • An alternative option here is to use XCom/data lineage artifacts for 
signal backend. Aka when an operator A pushes a new value of XCom keyed K, 
execute operator B with that XCom value. Most of the use cases I can think of 
for even based scheduling can adapt to this concept where downstream operators 
need to run whenever a new data artifact is generate by an upstream artifact.
    • Issue here is we don't have good data artifact representation on Airflow 
as they usually tend to be implicit (over explicit). Hopefully w Functional 
DAGs we can improve this.
• We should also consider how would we implement a use case where its not just 
a long running task, but just several tasks that run in different schedules and 
that need a common downstream tasks to be run after either of them finishes. 
(example: train a new model every time new examples are available or new 
embeddings are available). This is why I think a more high level design could 
benefit the proposal. Aka async DAG execution.
• UI is going to be affected by the described clear task instance you describe. 
As a user I will only be able to see the latest task instance and that’s no 
good. Web UI will need to be changed to accommodate this and we will need to 
find a way to have multiple task instances of a single task_id + lineage 
visualization for understanding what event/task instance generated what 
execution of a task instance.


Gerard Casas Saez
Twitter | Cortex | @casassaez
On Jun 17, 2020, 10:22 AM -0600, Becket Qin <becket....@gmail.com>, wrote:
> Hi Kevin and Ash,
>
> Thanks for the feedback / comments / suggestions. It is great to know that
> Airbnb has already been running stream jobs using AirFlow and there might
> be a working solution.
>
> First of all, I'd like to say that we are still quite new to the AirFlow,
> our proposal likely has overlooked many things and that's why we would like
> to get advices from the community. So we really appreciate suggestions and
> feedback.
>
> I can't agree more that we should introduce mechanisms to AirFlow instead
> of just making it support some specific use cases. The two scenarios
> mentioned by Nicholas above are just some examples to explain how
> signal-based scheduling enables new possibility. It does not suggest that
> our goal is just to make those two cases work.
>
> Just to add some background to the proposal, the original motivation of
> this AIP actually came from our project in Apache Flink community called AI
> Flow[1][2]. It aims to define a clear API for people to describe their
> machine learning workflows that contain both stream and batch jobs. We have
> a basic implementation of signal-based scheduling but would like to switch
> to AirFlow so we don't re-invent wheels.
>
> In any case, it is probably useful to clarify a little bit on the rational
> behind the proposal. The current AIP-35 proposes two major changes:
>
> *Signal based-scheduling*
> The key idea is to expand the conditions to trigger a scheduling action,
> from job status changes to a more extensible concept of *signal* or *event.
> *This concept extension brings two benefits:
>
> 1. Make AirFlow capable of dealing with richer types of workflows
> including long running jobs more smoothly.
> Although the two cases mentioned above may be supported with AirFlow as
> is, the solutions seem a workaround and are not something designed as the
> first class citizen. It essentially relies on the user code to put together
> some disjoint workflow submissions to form a complete business logic.
> Ideally we would like to see the entire business logic as an integral
> workflow.
> 2. Allow downstream projects to integrate with AirFlow more easily.
> As an example, imagine an operator in a workflow which deploys a model
> to an online inference service. In some cases, such model deployment also
> requires external approvals which is not a part of the workflow. With a
> scheduler taking external signals, it becomes much easier to integrate the
> workflow with internal approval systems.
>
> *Signal notification service*
> Signal notification service is also helpful in two aspects:
>
> 1. It helps isolate the signal sender and signal receivers.
> That means an operator who sends the signal does not need to be aware of
> who might be listening to this signal and what action have to be taken. One
> signal may have multiple listeners and thus multiple workflow actions may
> be triggered.
> 2. It allows pluggability and again integration with existing systems.
> For example, some users may decide to have a Kafka based notification
> service.
>
> We do realize that this proposal is a significant change to AirFlow and
> expect taking some time to sort everything out. I am wondering if doing the
> following in order would help the discussion.
>
> 1. First reach consensus on whether this is worth doing or not.
> 2. If we think it is worth doing, what would be the ideal final state.
> 3. Come up with the implementation plan and migration path from the current
> state to the ideal final state.
>
> What do you think?
>
> Re: Ash
> About some of the questions and suggestions.
>
> And I'm also not a fan of the name -- perhaps just my background and
> > dealing with the scheduler and executor code -- but to me a signal is a
> > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was described
> > as "Event-driven Scheduling".
>
>
> I don't have a strong opinion on the naming. We picked "signal" instead of
> "event" simply because that sounds more relevant to "control". But I think
> "event" is also a suitable term and it is also intuitive enough.
>
> In terms of your proposed class layout:
> > class BaseOperator(SignalOperator, LoggingMixin):
> > Don't create the extra class, just put new things on BaseOperator.
>
>
> Tao raised a similar question in the google doc. I agree that it would work
> by putting new things on the BaseOperator. I think one reason of having a
> separate SignalOperator class is that theoretically speaking, the scheduler
> only needs to deal with the SignalOperator interface. If so, we may want to
> eventually migrate from the current BaseOperator to SignalOperator. The job
> status change will just become one type of the signals. So the new
> operators might be able to just implement a SignalOperator instead of
> BaseOperator. Having a separate SignalOperator would make it easier to
> eventually converge to SignalOperator and deprecate the current
> BaseOperator.
>
> Doesn't this require custom code inside the streaming job to report
> > events back to Airflow?
>
>
> For the job status change events / signals, users don't have to do
> anything. It is just currently the job status are communicated back the
> scheduler via Database, now it is going to be via the signal notification
> service.
> For the signals that is not job status related, user code needs to send a
> signal / event at some point.
>
> 3. How do we notice if that long running task dies without a trace --
> > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> > never signals that it errored how does Airflow know?
>
>
> In our basic implementation, we have a job status listener component that
> can work with K8S / Yarn / Other cloud platforms, to keep track of the job
> status change. And the job status change signals are actually sent by it.
> Does AirFlow right now also listen to the status of the jobs running in
> K8S? Can you help us understand a bit more on the current failure handling
> model of AirFlow?
>
> 4. You're new key component of th Signal Notification Service is not
> > clearly defined.
> > Where does it store things?
> > Does it need persistent storage, or is in-memory good enough?
> > Does it need at-most-once delivery semantics? At-least-once delivery?
> > Does the scheduler pull from it, or does signal service push to the
> > scheduler.
> > What happens when there are more than one scheduler running (AIP-15,
> > which I am working on right now)
> > How do we run more than one notification service component (i.e. HA --
> > once the scheduler is HA we can't have any other new components being a
> > single point of failure)
> > Do signals get delivered to just one scheduler, or all schedulers?
>
>
> Sorry about the confusion. It is true that a bunch of details are missed
> here. We will add them into the wiki. To answer the questions quickly:
>
> - In our basic implementation, default signal notification service
> stores the signal in my sql.
> - The notification service has a REST API works in a long-pull manner,
> i.e. a client sends an http request with the last version of the keys it
> has seen, and that request parking waits on the server side until one of
> the following is met:
> - Request timeout is reached, in which case an empty response is
> returned.
> - A new version of the interested signal is available. The
> notification service returns all the SignalVersions newer than
> the version
> last seen by the client.
> - The scheduler pulls from the notification service, but in a long-pull
> manner.
> - Off the top of my head, the notification service probably just needs
> one instance which supports multiple schedulers. It could have more
> instances for scalability.
> - Technically speaking the notification service frontend itself is
> stateless. So failure recovery is simple. So if the database of the
> notification service has HA. It should be sufficient.
> - Signals will be delivered to all the schedulers that are listening to
> that signal.
>
> 5. Does this now mean to get started out with Airflow you need to run
> > three components: webserver, scheduler and signal service?
>
>
> Not sure if there is a problem, but I imagine for a local deployment, a
> signal service could be a part of the webserver, just with one more REST
> endpoint.
>
> Thanks again for all the great feedback and looking forward to your
> suggestions.
>
> Cheers,
>
> Jiangjie (Becket) Qin
>
> [1] https://www.youtube.com/watch?v=xiYJTCj2zUU
> [2] https://www.slideshare.net/JiangjieQin/ai-made-easy-with-flink-ai-flow
>
>
> On Wed, Jun 17, 2020 at 6:16 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>
> > I also agree with Kevin - I'd love to see better streaming support in
> > Airflow, but I'm not sure this is the way to go about it. Something
> > about it feels not quite right.
> >
> > And I'm also not a fan of the name -- perhaps just my background and
> > dealing with the scheduler and executor code -- but to me a signal is a
> > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was described
> > as "Event-driven Scheduling".
> >
> >
> > To take your example:
> >
> > > For example, in machine learning, an online
> > > learning job is a streaming job running forever. It emits a model
> > > periodically and a batch evaluate job should run to validate each model
> >
> > This is possible right now in Airflow by making an API request to
> > trigger a run of the validation DAG.
> >
> > In terms of your proposed class layout:
> >
> > class BaseOperator(SignalOperator, LoggingMixin):
> >
> > Don't create the extra class, just put new things on BaseOperator.
> >
> > As Chris said:
> >
> > > Are you saying that you actually have tasks in Airflow that are
> > > intended to run indefinitely?
> >
> > 1. How is Airflow meant to handle that task? Right now it runs it _and
> > waits for completion_. It's not clear you've actually thought through
> > everything this change would involve at the code level -- specifically
> > in the Executors.
> >
> > 2. Doesn't this require custom code inside the streaming job to report
> > events back to Airflow?
> >
> > I am very uneasy about _needing_ to make changes in the "task code" to
> > support running it under Airflow.
> >
> > One of the big plus points to me is that right now with Airflow can
> > run whatever tasks or jobs you like, without your job code needing to
> > know or care about Airflow. This approach changes that. And this is not
> > a decision to be made lightly.
> >
> > 3. How do we notice if that long running task dies without a trace --
> > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> > never signals that it errored how does Airflow know?
> >
> > 4. You're new key component of th Signal Notification Service is not
> > clearly defined.
> >
> > Where does it store things?
> >
> > Does it need persistent storage, or is in-memory good enough?
> >
> > Does it need at-most-once delivery semantics? At-least-once delivery?
> >
> > Does the scheduler pull from it, or does signal service push to the
> > scheduler.
> >
> > What happens when there are more than one scheduler running (AIP-15,
> > which I am working on right now)
> >
> > How do we run more than one notification service component (i.e. HA --
> > once the scheduler is HA we can't have any other new components being a
> > single point of failure)
> >
> > Do signals get delivered to just one scheduler, or all schedulers?
> >
> > 5. Does this now mean to get started out with Airflow you need to run
> > three components: webserver, scheduler and signal service?
> >
> > (Although the performance on a laptop doesn't matter _too_ much I am
> > concerned that we keep the "getting started" experience as easy as
> > possible for new users)
> >
> > In short I'd like to see _a lot_ more detail on this proposal. It's
> > proposing a fundamental change to Airflow and there are still lots of
> > things not covered in enough detail.
> >
> > I'm also not sure it's even needed for your example workflow. For
> > instance one way of dealing with it right now would be:
> >
> > - A task to start a streaming job. This is essentially fire-and-forget.
> > - A dag with schedule_interval=None that is "manually" invoked (by API
> > request from within your streaming code) with these two tasks:
> >
> > model build task >> model validation task >> streaming "restart" task
> >
> > i.e. this manually triggerd dag does the job of your signals proposal
> > if I've understood your example correctly. I'm not saying this is a
> > _good_ way of doing it, just A way. Have I correctly understood your
> > example?
> >
> >
> > -ash
> >
> >
> >
> > On Jun 17 2020, at 10:21 am, 蒋晓峰 <thanosxnicho...@gmail.com> wrote:
> >
> > > Hi Gerard,
> > >
> > > If users follow the definition of SignalOperator correctly, the idea for
> > > the streaming-triggered-batch case is to restart the execution for
> > > evaluations of the online trained model. In other words, once the
> > > evaluation operator receives the signal from the online learning
> > operator,
> > > the scheduler takes RESTART action to restart the task of the
> > > evaluation on
> > > the indefinitely running DAGRun. Upon a new signal is received, the
> > > scheduler checks with the SignalOperators to determine the action of
> > > Operator and carries out that action. Then, this scheduling also updates
> > > the state of the task instance, not clear the operator's state in the
> > > metadata database. At this time, the scheduler cuts out the subgraph from
> > > the DAG and constructs the sub DAG to run.
> > >
> > > While keeping a history of operator execution, the scheduling could use
> > > signals and execution history to organize signal conditions of the
> > > operators. Once the operator of the evaluation receives a new signal, the
> > > scheduler checks whether the signal and previous executions are met with
> > > the conditions. If the received signal is met with the corresponding
> > > condition and the execution of task instances also meet the
> > > conditions, the
> > > scheduler would take the RESTART action repeatedly for model evaluation.
> > >
> > > For supporting signal based triggers, the signal-based scheduling
> > requires
> > > the introduction of SignalOperator for the operators that send signals.
> > > With the SignalOperator, the scheduler could switch to use signals of the
> > > operator and the corresponding condition to decide the action of the
> > > operator. No matter whether the relationship between upstream tasks to
> > > downstream tasks is one-to-one or one-to-many, this scheduling only
> > checks
> > > whether the received signals of the SignalOperator is met with the signal
> > > conditions and concerns about which action to take when these conditions
> > > are met.
> > >
> > > By the way, signal-based scheduling needs a few more things to map
> > signals
> > > to corresponding actions.
> > >
> > > Regards,
> > > Nicholas Jiang
> > >
> > > On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez
> > > <gcasass...@twitter.com.invalid> wrote:
> > >
> > > > That looks interesting. My main worry is how to handle multiple
> > executions
> > > > of Signal based operators. If I follow your definition correctly, the
> > idea
> > > > is to run multiple evaluations of the online trained model (on a
> > > > permanently running DAGRun). So what happens when you have triggered the
> > > > downstream operators using a signal once? Do you clear state of the
> > > > operators once a new signal is generated? Do you generate a new DAGRun?
> > > >
> > > > How do you evaluate the model several times while keeping a history of
> > > > operator execution?
> > > >
> > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which
> > > > basically describe something similar to what you are proposing in
> > > > this AIP:
> > > > https://github.com/tensorflow/community/pull/253 (interesting read as
> > > > it’s also related to the ML field).
> > > >
> > > > My main concern would be that you may need to change a few more
> > > > things to
> > > > support signal based triggers, as the relationship between upstream
> > tasks
> > > > to downstream tasks is no longer 1:1 but 1:many.
> > > >
> > > > Gerard Casas Saez
> > > > Twitter | Cortex | @casassaez
> > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, wrote:
> > > > > Hello everyone,
> > > > >
> > > > > Sending a message to everyone and collecting feedback on the AIP-35 on
> > > > > adding signal-based scheduling. This was previously briefly
> > > > mentioned in
> > > > > the discussion of development slack channel. The key motivation of
> > this
> > > > > proposal is to support a mixture of batch and stream jobs in the same
> > > > > workflow.
> > > > >
> > > > > In practice, there are many business logics that need collaboration
> > > > between
> > > > > streaming and batch jobs. For example, in machine learning, an online
> > > > > learning job is a streaming job running forever. It emits a model
> > > > > periodically and a batch evaluate job should run to validate each
> > model.
> > > > So
> > > > > this is a streaming-triggered-batch case. If we continue from the
> > above
> > > > > example, once the model passes the validation, the model needs to be
> > > > > deployed to a serving job. That serving job could be a streaming
> > > > job that
> > > > > keeps polling records from Kafka and uses a model to do prediction.
> > And
> > > > the
> > > > > availability of a new model requires that streaming prediction job
> > either
> > > > > to restart, or to reload the new model on the fly. In either case,
> > > > it is
> > > > a
> > > > > batch-to-streaming job triggering.
> > > > >
> > > > > At this point above, people basically have to invent their own way to
> > > > deal
> > > > > with such workflows consisting of both batch and streaming jobs. I
> > think
> > > > > having a system that can help smoothly work with a mixture of
> > streaming
> > > > and
> > > > > batch jobs is valuable here.
> > > > >
> > > > > This AIP-35 focuses on signal-based scheduling to let the operators
> > send
> > > > > signals to the scheduler to trigger a scheduling action, such as
> > starting
> > > > > jobs, stopping jobs and restarting jobs. With the change of
> > > > > scheduling mechanism, a streaming job can send signals to the
> > scheduler
> > > > to
> > > > > indicate the state change of the dependencies.
> > > > >
> > > > > Signal-based scheduling allows the scheduler to know the change of the
> > > > > dependency state immediately without periodically querying the
> > metadata
> > > > > database. This also allows potential support for richer scheduling
> > > > > semantics such as periodic execution and manual trigger at per
> > operator
> > > > > granularity.
> > > > >
> > > > > Changes proposed:
> > > > >
> > > > > - Signals are used to define conditions that must be met to run an
> > > > > operator. State change of the upstream tasks is one type of the
> > signals.
> > > > > There may be other types of signals. The scheduler may take different
> > > > > actions when receiving different signals. To let the operators take
> > > > signals
> > > > > as their starting condition, we propose to introduce SignalOperator
> > which
> > > > > is mentioned in the public interface section.
> > > > > - A notification service is necessary to help receive and propagate
> > the
> > > > > signals from the operators and other sources to the scheduler. Upon
> > > > > receiving a signal, the scheduler can take action according to the
> > > > > predefined signal-based conditions on the operators. Therefore we
> > propose
> > > > > to introduce a Signal Notification Service component to Airflow.
> > > > >
> > > > > Please see related documents and PRs for details:
> > > > >
> > > > > AIP:
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
> > > > > Issue: https://github.com/apache/airflow/issues/9321
> > > > >
> > > > > Please let me know if there are any aspects that you agree/disagree
> > with
> > > > or
> > > > > need more clarification (especially the SignalOperator and
> > > > SignalService).
> > > > > Any comments are welcome and I am looking forward to it!
> > > > >
> > > > > Thanks,
> > > > > Nicholas
> > > >
> > >
> >

Reply via email to