Hi Kevin and Ash,

Thanks for the feedback / comments / suggestions. It is great to know that
Airbnb has already been running stream jobs using AirFlow and there might
be a working solution.

First of all, I'd like to say that we are still quite new to the AirFlow,
our proposal likely has overlooked many things and that's why we would like
to get advices from the community. So we really appreciate suggestions and
feedback.

I can't agree more that we should introduce mechanisms to AirFlow instead
of just making it support some specific use cases. The two scenarios
mentioned by Nicholas above are just some examples to explain how
signal-based scheduling enables new possibility. It does not suggest that
our goal is just to make those two cases work.

Just to add some background to the proposal, the original motivation of
this AIP actually came from our project in Apache Flink community called AI
Flow[1][2]. It aims to define a clear API for people to describe their
machine learning workflows that contain both stream and batch jobs. We have
a basic implementation of signal-based scheduling but would like to switch
to AirFlow so we don't re-invent wheels.

In any case, it is probably useful to clarify a little bit on the rational
behind the proposal. The current AIP-35 proposes two major changes:

*Signal based-scheduling*
The key idea is to expand the conditions to trigger a scheduling action,
from job status changes to a more extensible concept of *signal* or *event.
*This concept extension brings two benefits:

   1. Make AirFlow capable of dealing with richer types of workflows
   including long running jobs more smoothly.
   Although the two cases mentioned above may be supported with AirFlow as
   is, the solutions seem a workaround and are not something designed as the
   first class citizen. It essentially relies on the user code to put together
   some disjoint workflow submissions to form a complete business logic.
   Ideally we would like to see the entire business logic as an integral
   workflow.
   2. Allow downstream projects to integrate with AirFlow more easily.
   As an example, imagine an operator in a workflow which deploys a model
   to an online inference service. In some cases, such model deployment also
   requires external approvals which is not a part of the workflow. With a
   scheduler taking external signals, it becomes much easier to integrate the
   workflow with internal approval systems.

*Signal notification service*
Signal notification service is also helpful in two aspects:

   1. It helps isolate the signal sender and signal receivers.
   That means an operator who sends the signal does not need to be aware of
   who might be listening to this signal and what action have to be taken. One
   signal may have multiple listeners and thus multiple workflow actions may
   be triggered.
   2. It allows pluggability and again integration with existing systems.
   For example, some users may decide to have a Kafka based notification
   service.

We do realize that this proposal is a significant change to AirFlow and
expect taking some time to sort everything out. I am wondering if doing the
following in order would help the discussion.

1. First reach consensus on whether this is worth doing or not.
2. If we think it is worth doing, what would be the ideal final state.
3. Come up with the implementation plan and migration path from the current
state to the ideal final state.

What do you think?

Re: Ash
About some of the questions and suggestions.

And I'm also not a fan of the name -- perhaps just my background and
> dealing with the scheduler and executor code -- but to me a signal is a
> Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was described
> as "Event-driven Scheduling".


I don't have a strong opinion on the naming. We picked "signal" instead of
"event" simply because that sounds more relevant to "control". But I think
"event" is also a suitable term and it is also intuitive enough.

In terms of your proposed class layout:
>     class BaseOperator(SignalOperator, LoggingMixin):
> Don't create the extra class, just put new things on BaseOperator.


Tao raised a similar question in the google doc. I agree that it would work
by putting new things on the BaseOperator. I think one reason of having a
separate SignalOperator class is that theoretically speaking, the scheduler
only needs to deal with the SignalOperator interface. If so, we may want to
eventually migrate from the current BaseOperator to SignalOperator. The job
status change will just become one type of the signals. So the new
operators might be able to just implement a SignalOperator instead of
BaseOperator. Having a separate SignalOperator would make it easier to
eventually converge to SignalOperator and deprecate the current
BaseOperator.

Doesn't this require custom code inside the streaming job to report
> events back to Airflow?


For the job status change events / signals, users don't have to do
anything. It is just currently the job status are communicated back the
scheduler via Database, now it is going to be via the signal notification
service.
For the signals that is not job status related, user code needs to send a
signal / event at some point.

3. How do we notice if that long running task dies without a trace --
> i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> never signals that it errored how does Airflow know?


In our basic implementation, we have a job status listener component that
can work with K8S / Yarn / Other cloud platforms, to keep track of the  job
status change. And the job status change signals are actually sent by it.
Does AirFlow right now also listen to the status of the jobs running in
K8S? Can you help us understand a bit more on the current failure handling
model of AirFlow?

4. You're new key component of th Signal Notification Service is not
> clearly defined.
>   Where does it store things?
>   Does it need persistent storage, or is in-memory good enough?
>   Does it need at-most-once delivery semantics? At-least-once delivery?
>   Does the scheduler pull from it, or does signal service push to the
> scheduler.
>   What happens when there are more than one scheduler running (AIP-15,
> which I am working on right now)
>   How do we run more than one notification service component (i.e. HA --
> once the scheduler is HA we can't have any other new components being a
> single point of failure)
>   Do signals get delivered to just one scheduler, or all schedulers?


Sorry about the confusion. It is true that a bunch of details are missed
here. We will add them into the wiki. To answer the questions quickly:

   - In our basic implementation, default signal notification service
   stores the signal in my sql.
   - The notification service has a REST API works in a long-pull manner,
   i.e. a client sends an http request with the last version of the keys it
   has seen, and that request parking waits on the server side until one of
   the following is met:
      - Request timeout is reached, in which case an empty response is
      returned.
      - A new version of the interested signal is available. The
      notification service returns all the SignalVersions newer than
the version
      last seen by the client.
   - The scheduler pulls from the notification service, but in a long-pull
   manner.
   - Off the top of my head, the notification service probably just needs
   one instance which supports multiple schedulers. It could have more
   instances for scalability.
   - Technically speaking the notification service frontend itself is
   stateless. So failure recovery is simple. So if the database of the
   notification service has HA. It should be sufficient.
   - Signals will be delivered to all the schedulers that are listening to
   that signal.

5. Does this now mean to get started out with Airflow you need to run
> three components: webserver, scheduler and signal service?


Not sure if there is a problem, but I imagine for a local deployment, a
signal service could be a part of the webserver, just with one more REST
endpoint.

Thanks again for all the great feedback and looking forward to your
suggestions.

Cheers,

Jiangjie (Becket) Qin

[1] https://www.youtube.com/watch?v=xiYJTCj2zUU
[2] https://www.slideshare.net/JiangjieQin/ai-made-easy-with-flink-ai-flow


On Wed, Jun 17, 2020 at 6:16 PM Ash Berlin-Taylor <a...@apache.org> wrote:

> I also agree with Kevin - I'd love to see better streaming support in
> Airflow, but I'm not sure this is the way to go about it. Something
> about it feels not quite right.
>
> And I'm also not a fan of the name -- perhaps just my background and
> dealing with the scheduler and executor code -- but to me a signal is a
> Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was described
> as "Event-driven Scheduling".
>
>
> To take your example:
>
> > For example, in machine learning, an online
> > learning job is a streaming job running forever. It emits a model
> > periodically and a batch evaluate job should run to validate each model
>
> This is possible right now in Airflow by making an API request to
> trigger a run of the validation DAG.
>
> In terms of your proposed class layout:
>
>     class BaseOperator(SignalOperator, LoggingMixin):
>
> Don't create the extra class, just put new things on BaseOperator.
>
> As Chris said:
>
> > Are you saying that you actually have tasks in Airflow that are
> > intended to run indefinitely?
>
> 1. How is Airflow meant to handle that task? Right now it runs it _and
> waits for completion_. It's not clear you've actually thought through
> everything this change would involve at the code level -- specifically
> in the Executors.
>
> 2. Doesn't this require custom code inside the streaming job to report
> events back to Airflow?
>
>   I am very uneasy about _needing_ to make changes in the "task code" to
> support running it under Airflow.
>
>   One of the big plus points to me is that right now with Airflow can
> run whatever tasks or jobs you like, without your job code needing to
> know or care about Airflow. This approach changes that. And this is not
> a decision to be made lightly.
>
> 3. How do we notice if that long running task dies without a trace --
> i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> never signals that it errored how does Airflow know?
>
> 4. You're new key component of th Signal Notification Service is not
> clearly defined.
>
>   Where does it store things?
>
>   Does it need persistent storage, or is in-memory good enough?
>
>   Does it need at-most-once delivery semantics? At-least-once delivery?
>
>   Does the scheduler pull from it, or does signal service push to the
> scheduler.
>
>   What happens when there are more than one scheduler running (AIP-15,
> which I am working on right now)
>
>   How do we run more than one notification service component (i.e. HA --
> once the scheduler is HA we can't have any other new components being a
> single point of failure)
>
>   Do signals get delivered to just one scheduler, or all schedulers?
>
> 5. Does this now mean to get started out with Airflow you need to run
> three components: webserver, scheduler and signal service?
>
>   (Although the performance on a laptop doesn't matter _too_ much I am
> concerned that we keep the "getting started" experience as easy as
> possible for new users)
>
> In short I'd like to see _a lot_ more detail on this proposal. It's
> proposing a fundamental change to Airflow and there are still lots of
> things not covered in enough detail.
>
> I'm also not sure it's even needed for your example workflow. For
> instance one way of dealing with it right now would be:
>
> - A task to start a streaming job. This is essentially fire-and-forget.
> - A dag with schedule_interval=None that is "manually" invoked (by API
> request from within your streaming code) with these two tasks:
>
>       model build task >> model validation task >> streaming "restart" task
>
>   i.e. this manually triggerd dag does the job of your signals proposal
> if I've understood your example correctly. I'm not saying this is a
> _good_ way of doing it, just A way. Have I correctly understood your
> example?
>
>
> -ash
>
>
>
> On Jun 17 2020, at 10:21 am, 蒋晓峰 <thanosxnicho...@gmail.com> wrote:
>
> > Hi Gerard,
> >
> > If users follow the definition of SignalOperator correctly, the idea for
> > the streaming-triggered-batch case is to restart the execution for
> > evaluations of the online trained model. In other words, once the
> > evaluation operator receives the signal from the online learning
> operator,
> > the scheduler takes RESTART action to restart the task of the
> > evaluation on
> > the indefinitely running DAGRun. Upon a new signal is received, the
> > scheduler checks with the SignalOperators to determine the action of
> > Operator and carries out that action. Then, this scheduling also updates
> > the state of the task instance, not clear the operator's state in the
> > metadata database. At this time, the scheduler cuts out the subgraph from
> > the DAG and constructs the sub DAG to run.
> >
> > While keeping a history of operator execution, the scheduling could use
> > signals and execution history to organize signal conditions of the
> > operators. Once the operator of the evaluation receives a new signal, the
> > scheduler checks whether the signal and previous executions are met with
> > the conditions. If the received signal is met with the corresponding
> > condition and the execution of task instances also meet the
> > conditions, the
> > scheduler would take the RESTART action repeatedly for model evaluation.
> >
> > For supporting signal based triggers, the signal-based scheduling
> requires
> > the introduction of SignalOperator for the operators that send signals.
> > With the SignalOperator, the scheduler could switch to use signals of the
> > operator and the corresponding condition to decide the action of the
> > operator. No matter whether the relationship between upstream tasks to
> > downstream tasks is one-to-one or one-to-many, this scheduling only
> checks
> > whether the received signals of the SignalOperator is met with the signal
> > conditions and concerns about which action to take when these conditions
> > are met.
> >
> > By the way, signal-based scheduling needs a few more things to map
> signals
> > to corresponding actions.
> >
> > Regards,
> > Nicholas Jiang
> >
> > On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez
> > <gcasass...@twitter.com.invalid> wrote:
> >
> >> That looks interesting. My main worry is how to handle multiple
> executions
> >> of Signal based operators. If I follow your definition correctly, the
> idea
> >> is to run multiple evaluations of the online trained model (on a
> >> permanently running DAGRun). So what happens when you have triggered the
> >> downstream operators using a signal once? Do you clear state of the
> >> operators once a new signal is generated? Do you generate a new DAGRun?
> >>
> >> How do you evaluate the model several times while keeping a history of
> >> operator execution?
> >>
> >> Related to this, TFX has a similar proposal for ASYNC DAGs, which
> >> basically describe something similar to what you are proposing in
> >> this AIP:
> >> https://github.com/tensorflow/community/pull/253 (interesting read as
> >> it’s also related to the ML field).
> >>
> >> My main concern would be that you may need to change a few more
> >> things to
> >> support signal based triggers, as the relationship between upstream
> tasks
> >> to downstream tasks is no longer 1:1 but 1:many.
> >>
> >> Gerard Casas Saez
> >> Twitter | Cortex | @casassaez
> >> On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, wrote:
> >> > Hello everyone,
> >> >
> >> > Sending a message to everyone and collecting feedback on the AIP-35 on
> >> > adding signal-based scheduling. This was previously briefly
> >> mentioned in
> >> > the discussion of development slack channel. The key motivation of
> this
> >> > proposal is to support a mixture of batch and stream jobs in the same
> >> > workflow.
> >> >
> >> > In practice, there are many business logics that need collaboration
> >> between
> >> > streaming and batch jobs. For example, in machine learning, an online
> >> > learning job is a streaming job running forever. It emits a model
> >> > periodically and a batch evaluate job should run to validate each
> model.
> >> So
> >> > this is a streaming-triggered-batch case. If we continue from the
> above
> >> > example, once the model passes the validation, the model needs to be
> >> > deployed to a serving job. That serving job could be a streaming
> >> job that
> >> > keeps polling records from Kafka and uses a model to do prediction.
> And
> >> the
> >> > availability of a new model requires that streaming prediction job
> either
> >> > to restart, or to reload the new model on the fly. In either case,
> >> it is
> >> a
> >> > batch-to-streaming job triggering.
> >> >
> >> > At this point above, people basically have to invent their own way to
> >> deal
> >> > with such workflows consisting of both batch and streaming jobs. I
> think
> >> > having a system that can help smoothly work with a mixture of
> streaming
> >> and
> >> > batch jobs is valuable here.
> >> >
> >> > This AIP-35 focuses on signal-based scheduling to let the operators
> send
> >> > signals to the scheduler to trigger a scheduling action, such as
> starting
> >> > jobs, stopping jobs and restarting jobs. With the change of
> >> > scheduling mechanism, a streaming job can send signals to the
> scheduler
> >> to
> >> > indicate the state change of the dependencies.
> >> >
> >> > Signal-based scheduling allows the scheduler to know the change of the
> >> > dependency state immediately without periodically querying the
> metadata
> >> > database. This also allows potential support for richer scheduling
> >> > semantics such as periodic execution and manual trigger at per
> operator
> >> > granularity.
> >> >
> >> > Changes proposed:
> >> >
> >> > - Signals are used to define conditions that must be met to run an
> >> > operator. State change of the upstream tasks is one type of the
> signals.
> >> > There may be other types of signals. The scheduler may take different
> >> > actions when receiving different signals. To let the operators take
> >> signals
> >> > as their starting condition, we propose to introduce SignalOperator
> which
> >> > is mentioned in the public interface section.
> >> > - A notification service is necessary to help receive and propagate
> the
> >> > signals from the operators and other sources to the scheduler. Upon
> >> > receiving a signal, the scheduler can take action according to the
> >> > predefined signal-based conditions on the operators. Therefore we
> propose
> >> > to introduce a Signal Notification Service component to Airflow.
> >> >
> >> > Please see related documents and PRs for details:
> >> >
> >> > AIP:
> >> >
> >>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
> >> > Issue: https://github.com/apache/airflow/issues/9321
> >> >
> >> > Please let me know if there are any aspects that you agree/disagree
> with
> >> or
> >> > need more clarification (especially the SignalOperator and
> >> SignalService).
> >> > Any comments are welcome and I am looking forward to it!
> >> >
> >> > Thanks,
> >> > Nicholas
> >>
> >
>

Reply via email to