Metacomment:
You might want to consider moving this discussion to a google doc or
something since it seems like a lot of divergent threads are being created
due to the scope of this change, which can be a bit hard to keep track of
in email. If folks are concerned about history we can dump the google doc
contents/comments into the AIP at the end. I see that you mentioned a
google doc but I didn't see a link so I'm assuming it was some private one.


At a high level I think this is a really great idea, and will unlock the
ability to remove a lot of the bottlenecks of Airflow that are based around
the current polling-based task scheduling, and also prevent the need for
sensor operators in a lot of cases.

1.

>  I'm in general supportive of this idea of supporting streaming jobs.

It's my understanding that the scope of this PR is not to support streaming
tasks in Airflow (Nicholas if I'm incorrect let me know since this is an
important decision). Airflow isn't really designed to work well with
streaming, typically you use an http operator or something like that to
send signal requests etc. Otherwise AIrflow would need to become kind of a
service orchestrator which I think would be outside of the scope of what
Airflow is designed to do and would be hard to implement in a clean way
using Airflow's current primitives like tasks/dagruns/xcom, though it's
possible I am wrong. I haven't been at Airbnb for a while now, so take this
with a grain of salt, but the usage of Airflow as a service orchestrator
IIRC was more of a hack to work around the fact that there was no easy way
to easily launch lightweight python services in Kubernetes, so Airflow
tasks were used instead.

2. Have we thought through the interactions between signals and existing
Airflow abstractions especially with respect to edge cases, e.g. handling
of missed signals, duplicated signals, zombie signals that no longer exist,
interactions of STOP and RESTART with task retries, interactions of signal
actions (e.g. start task) and scheduler actions (e.g. start task due to
cron schedule), how signals will work with priority mechanisms like task
priorities/non_pooled_task_slot_count etc, how each signal will interact
with each task state (e.g. user marks tasks failedi in the UI), or will
that happen in a subsequent doc/pull request?

In particular I'm curious about the interactions between scheduler polling
logic that results in task state changes and the new signals system as I'm
concerned that having both systems will add complexity and we should tryto
standardize on one (e.g. instead of a task getting retried by the scheduler
if it runs a polling loop and sees that the task received a failed signal
OR the task getting retried by the scheduler receiving a restart task,
instead we remove the polling logic and replace it with a signal handler
for tasks that send a FAILED signal). I'm also curious how the REST API
plays into this since it feels like there is some overlap with that as well.

Another example of something that should probably be moved to the signal
system is this part of the AIP:

>   3. Regarding this section in the AIP:
>     Step 3. Each available worker pulls a TaskInstance from the queue and
> executes it, sends a signal to  the notification service a signal
> indicating the task state changes from “queued” to “running”. The
> scheduling loop will then update the task state in the database when
> receiving this signal..
>   Shouldn't the DB update logic happen in the schedule signal listener
> instead of the poll-based scheduling loop?



On Thu, Jun 18, 2020 at 3:00 PM Gerard Casas Saez
<gcasass...@twitter.com.invalid> wrote:

> Re-read this a couple times.  Some thoughts:
>
> • I agree on calling it event based scheduling, may be less misleading.
> • I believe we can mostly all agree that this is something interesting to
> add to Airflow and a use case that would be good to support. Nailing the
> details and path to implement is important though. (or at least that’s my
> understanding)
> • In my opinion, I think the best way would be to extend BaseOperator to
> incorporate signals as suggested before. Adding a new operator seems it
> will make things more complex for the end user.
>     • We should allow user to configure if they want to use Event based
> scheduling (async/signal) or operator dependency scheduling
> (sync/scheduler). This would allow users to choose the mode they want to
> run their DAG, and potentially also change the execution UI for the DAG.
> • Still concerned on adding logic to issue signals in user code.
>     • An alternative option here is to use XCom/data lineage artifacts for
> signal backend. Aka when an operator A pushes a new value of XCom keyed K,
> execute operator B with that XCom value. Most of the use cases I can think
> of for even based scheduling can adapt to this concept where downstream
> operators need to run whenever a new data artifact is generate by an
> upstream artifact.
>     • Issue here is we don't have good data artifact representation on
> Airflow as they usually tend to be implicit (over explicit). Hopefully w
> Functional DAGs we can improve this.
> • We should also consider how would we implement a use case where its not
> just a long running task, but just several tasks that run in different
> schedules and that need a common downstream tasks to be run after either of
> them finishes. (example: train a new model every time new examples are
> available or new embeddings are available). This is why I think a more high
> level design could benefit the proposal. Aka async DAG execution.
> • UI is going to be affected by the described clear task instance you
> describe. As a user I will only be able to see the latest task instance and
> that’s no good. Web UI will need to be changed to accommodate this and we
> will need to find a way to have multiple task instances of a single task_id
> + lineage visualization for understanding what event/task instance
> generated what execution of a task instance.
>
>
> Gerard Casas Saez
> Twitter | Cortex | @casassaez
> On Jun 17, 2020, 10:22 AM -0600, Becket Qin <becket....@gmail.com>, wrote:
> > Hi Kevin and Ash,
> >
> > Thanks for the feedback / comments / suggestions. It is great to know
> that
> > Airbnb has already been running stream jobs using AirFlow and there might
> > be a working solution.
> >
> > First of all, I'd like to say that we are still quite new to the AirFlow,
> > our proposal likely has overlooked many things and that's why we would
> like
> > to get advices from the community. So we really appreciate suggestions
> and
> > feedback.
> >
> > I can't agree more that we should introduce mechanisms to AirFlow instead
> > of just making it support some specific use cases. The two scenarios
> > mentioned by Nicholas above are just some examples to explain how
> > signal-based scheduling enables new possibility. It does not suggest that
> > our goal is just to make those two cases work.
> >
> > Just to add some background to the proposal, the original motivation of
> > this AIP actually came from our project in Apache Flink community called
> AI
> > Flow[1][2]. It aims to define a clear API for people to describe their
> > machine learning workflows that contain both stream and batch jobs. We
> have
> > a basic implementation of signal-based scheduling but would like to
> switch
> > to AirFlow so we don't re-invent wheels.
> >
> > In any case, it is probably useful to clarify a little bit on the
> rational
> > behind the proposal. The current AIP-35 proposes two major changes:
> >
> > *Signal based-scheduling*
> > The key idea is to expand the conditions to trigger a scheduling action,
> > from job status changes to a more extensible concept of *signal* or
> *event.
> > *This concept extension brings two benefits:
> >
> > 1. Make AirFlow capable of dealing with richer types of workflows
> > including long running jobs more smoothly.
> > Although the two cases mentioned above may be supported with AirFlow as
> > is, the solutions seem a workaround and are not something designed as the
> > first class citizen. It essentially relies on the user code to put
> together
> > some disjoint workflow submissions to form a complete business logic.
> > Ideally we would like to see the entire business logic as an integral
> > workflow.
> > 2. Allow downstream projects to integrate with AirFlow more easily.
> > As an example, imagine an operator in a workflow which deploys a model
> > to an online inference service. In some cases, such model deployment also
> > requires external approvals which is not a part of the workflow. With a
> > scheduler taking external signals, it becomes much easier to integrate
> the
> > workflow with internal approval systems.
> >
> > *Signal notification service*
> > Signal notification service is also helpful in two aspects:
> >
> > 1. It helps isolate the signal sender and signal receivers.
> > That means an operator who sends the signal does not need to be aware of
> > who might be listening to this signal and what action have to be taken.
> One
> > signal may have multiple listeners and thus multiple workflow actions may
> > be triggered.
> > 2. It allows pluggability and again integration with existing systems.
> > For example, some users may decide to have a Kafka based notification
> > service.
> >
> > We do realize that this proposal is a significant change to AirFlow and
> > expect taking some time to sort everything out. I am wondering if doing
> the
> > following in order would help the discussion.
> >
> > 1. First reach consensus on whether this is worth doing or not.
> > 2. If we think it is worth doing, what would be the ideal final state.
> > 3. Come up with the implementation plan and migration path from the
> current
> > state to the ideal final state.
> >
> > What do you think?
> >
> > Re: Ash
> > About some of the questions and suggestions.
> >
> > And I'm also not a fan of the name -- perhaps just my background and
> > > dealing with the scheduler and executor code -- but to me a signal is a
> > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was
> described
> > > as "Event-driven Scheduling".
> >
> >
> > I don't have a strong opinion on the naming. We picked "signal" instead
> of
> > "event" simply because that sounds more relevant to "control". But I
> think
> > "event" is also a suitable term and it is also intuitive enough.
> >
> > In terms of your proposed class layout:
> > > class BaseOperator(SignalOperator, LoggingMixin):
> > > Don't create the extra class, just put new things on BaseOperator.
> >
> >
> > Tao raised a similar question in the google doc. I agree that it would
> work
> > by putting new things on the BaseOperator. I think one reason of having a
> > separate SignalOperator class is that theoretically speaking, the
> scheduler
> > only needs to deal with the SignalOperator interface. If so, we may want
> to
> > eventually migrate from the current BaseOperator to SignalOperator. The
> job
> > status change will just become one type of the signals. So the new
> > operators might be able to just implement a SignalOperator instead of
> > BaseOperator. Having a separate SignalOperator would make it easier to
> > eventually converge to SignalOperator and deprecate the current
> > BaseOperator.
> >
> > Doesn't this require custom code inside the streaming job to report
> > > events back to Airflow?
> >
> >
> > For the job status change events / signals, users don't have to do
> > anything. It is just currently the job status are communicated back the
> > scheduler via Database, now it is going to be via the signal notification
> > service.
> > For the signals that is not job status related, user code needs to send a
> > signal / event at some point.
> >
> > 3. How do we notice if that long running task dies without a trace --
> > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> > > never signals that it errored how does Airflow know?
> >
> >
> > In our basic implementation, we have a job status listener component that
> > can work with K8S / Yarn / Other cloud platforms, to keep track of the
> job
> > status change. And the job status change signals are actually sent by it.
> > Does AirFlow right now also listen to the status of the jobs running in
> > K8S? Can you help us understand a bit more on the current failure
> handling
> > model of AirFlow?
> >
> > 4. You're new key component of th Signal Notification Service is not
> > > clearly defined.
> > > Where does it store things?
> > > Does it need persistent storage, or is in-memory good enough?
> > > Does it need at-most-once delivery semantics? At-least-once delivery?
> > > Does the scheduler pull from it, or does signal service push to the
> > > scheduler.
> > > What happens when there are more than one scheduler running (AIP-15,
> > > which I am working on right now)
> > > How do we run more than one notification service component (i.e. HA --
> > > once the scheduler is HA we can't have any other new components being a
> > > single point of failure)
> > > Do signals get delivered to just one scheduler, or all schedulers?
> >
> >
> > Sorry about the confusion. It is true that a bunch of details are missed
> > here. We will add them into the wiki. To answer the questions quickly:
> >
> > - In our basic implementation, default signal notification service
> > stores the signal in my sql.
> > - The notification service has a REST API works in a long-pull manner,
> > i.e. a client sends an http request with the last version of the keys it
> > has seen, and that request parking waits on the server side until one of
> > the following is met:
> > - Request timeout is reached, in which case an empty response is
> > returned.
> > - A new version of the interested signal is available. The
> > notification service returns all the SignalVersions newer than
> > the version
> > last seen by the client.
> > - The scheduler pulls from the notification service, but in a long-pull
> > manner.
> > - Off the top of my head, the notification service probably just needs
> > one instance which supports multiple schedulers. It could have more
> > instances for scalability.
> > - Technically speaking the notification service frontend itself is
> > stateless. So failure recovery is simple. So if the database of the
> > notification service has HA. It should be sufficient.
> > - Signals will be delivered to all the schedulers that are listening to
> > that signal.
> >
> > 5. Does this now mean to get started out with Airflow you need to run
> > > three components: webserver, scheduler and signal service?
> >
> >
> > Not sure if there is a problem, but I imagine for a local deployment, a
> > signal service could be a part of the webserver, just with one more REST
> > endpoint.
> >
> > Thanks again for all the great feedback and looking forward to your
> > suggestions.
> >
> > Cheers,
> >
> > Jiangjie (Becket) Qin
> >
> > [1] https://www.youtube.com/watch?v=xiYJTCj2zUU
> > [2]
> https://www.slideshare.net/JiangjieQin/ai-made-easy-with-flink-ai-flow
> >
> >
> > On Wed, Jun 17, 2020 at 6:16 PM Ash Berlin-Taylor <a...@apache.org>
> wrote:
> >
> > > I also agree with Kevin - I'd love to see better streaming support in
> > > Airflow, but I'm not sure this is the way to go about it. Something
> > > about it feels not quite right.
> > >
> > > And I'm also not a fan of the name -- perhaps just my background and
> > > dealing with the scheduler and executor code -- but to me a signal is a
> > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was
> described
> > > as "Event-driven Scheduling".
> > >
> > >
> > > To take your example:
> > >
> > > > For example, in machine learning, an online
> > > > learning job is a streaming job running forever. It emits a model
> > > > periodically and a batch evaluate job should run to validate each
> model
> > >
> > > This is possible right now in Airflow by making an API request to
> > > trigger a run of the validation DAG.
> > >
> > > In terms of your proposed class layout:
> > >
> > > class BaseOperator(SignalOperator, LoggingMixin):
> > >
> > > Don't create the extra class, just put new things on BaseOperator.
> > >
> > > As Chris said:
> > >
> > > > Are you saying that you actually have tasks in Airflow that are
> > > > intended to run indefinitely?
> > >
> > > 1. How is Airflow meant to handle that task? Right now it runs it _and
> > > waits for completion_. It's not clear you've actually thought through
> > > everything this change would involve at the code level -- specifically
> > > in the Executors.
> > >
> > > 2. Doesn't this require custom code inside the streaming job to report
> > > events back to Airflow?
> > >
> > > I am very uneasy about _needing_ to make changes in the "task code" to
> > > support running it under Airflow.
> > >
> > > One of the big plus points to me is that right now with Airflow can
> > > run whatever tasks or jobs you like, without your job code needing to
> > > know or care about Airflow. This approach changes that. And this is not
> > > a decision to be made lightly.
> > >
> > > 3. How do we notice if that long running task dies without a trace --
> > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
> > > never signals that it errored how does Airflow know?
> > >
> > > 4. You're new key component of th Signal Notification Service is not
> > > clearly defined.
> > >
> > > Where does it store things?
> > >
> > > Does it need persistent storage, or is in-memory good enough?
> > >
> > > Does it need at-most-once delivery semantics? At-least-once delivery?
> > >
> > > Does the scheduler pull from it, or does signal service push to the
> > > scheduler.
> > >
> > > What happens when there are more than one scheduler running (AIP-15,
> > > which I am working on right now)
> > >
> > > How do we run more than one notification service component (i.e. HA --
> > > once the scheduler is HA we can't have any other new components being a
> > > single point of failure)
> > >
> > > Do signals get delivered to just one scheduler, or all schedulers?
> > >
> > > 5. Does this now mean to get started out with Airflow you need to run
> > > three components: webserver, scheduler and signal service?
> > >
> > > (Although the performance on a laptop doesn't matter _too_ much I am
> > > concerned that we keep the "getting started" experience as easy as
> > > possible for new users)
> > >
> > > In short I'd like to see _a lot_ more detail on this proposal. It's
> > > proposing a fundamental change to Airflow and there are still lots of
> > > things not covered in enough detail.
> > >
> > > I'm also not sure it's even needed for your example workflow. For
> > > instance one way of dealing with it right now would be:
> > >
> > > - A task to start a streaming job. This is essentially fire-and-forget.
> > > - A dag with schedule_interval=None that is "manually" invoked (by API
> > > request from within your streaming code) with these two tasks:
> > >
> > > model build task >> model validation task >> streaming "restart" task
> > >
> > > i.e. this manually triggerd dag does the job of your signals proposal
> > > if I've understood your example correctly. I'm not saying this is a
> > > _good_ way of doing it, just A way. Have I correctly understood your
> > > example?
> > >
> > >
> > > -ash
> > >
> > >
> > >
> > > On Jun 17 2020, at 10:21 am, 蒋晓峰 <thanosxnicho...@gmail.com> wrote:
> > >
> > > > Hi Gerard,
> > > >
> > > > If users follow the definition of SignalOperator correctly, the idea
> for
> > > > the streaming-triggered-batch case is to restart the execution for
> > > > evaluations of the online trained model. In other words, once the
> > > > evaluation operator receives the signal from the online learning
> > > operator,
> > > > the scheduler takes RESTART action to restart the task of the
> > > > evaluation on
> > > > the indefinitely running DAGRun. Upon a new signal is received, the
> > > > scheduler checks with the SignalOperators to determine the action of
> > > > Operator and carries out that action. Then, this scheduling also
> updates
> > > > the state of the task instance, not clear the operator's state in the
> > > > metadata database. At this time, the scheduler cuts out the subgraph
> from
> > > > the DAG and constructs the sub DAG to run.
> > > >
> > > > While keeping a history of operator execution, the scheduling could
> use
> > > > signals and execution history to organize signal conditions of the
> > > > operators. Once the operator of the evaluation receives a new
> signal, the
> > > > scheduler checks whether the signal and previous executions are met
> with
> > > > the conditions. If the received signal is met with the corresponding
> > > > condition and the execution of task instances also meet the
> > > > conditions, the
> > > > scheduler would take the RESTART action repeatedly for model
> evaluation.
> > > >
> > > > For supporting signal based triggers, the signal-based scheduling
> > > requires
> > > > the introduction of SignalOperator for the operators that send
> signals.
> > > > With the SignalOperator, the scheduler could switch to use signals
> of the
> > > > operator and the corresponding condition to decide the action of the
> > > > operator. No matter whether the relationship between upstream tasks
> to
> > > > downstream tasks is one-to-one or one-to-many, this scheduling only
> > > checks
> > > > whether the received signals of the SignalOperator is met with the
> signal
> > > > conditions and concerns about which action to take when these
> conditions
> > > > are met.
> > > >
> > > > By the way, signal-based scheduling needs a few more things to map
> > > signals
> > > > to corresponding actions.
> > > >
> > > > Regards,
> > > > Nicholas Jiang
> > > >
> > > > On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez
> > > > <gcasass...@twitter.com.invalid> wrote:
> > > >
> > > > > That looks interesting. My main worry is how to handle multiple
> > > executions
> > > > > of Signal based operators. If I follow your definition correctly,
> the
> > > idea
> > > > > is to run multiple evaluations of the online trained model (on a
> > > > > permanently running DAGRun). So what happens when you have
> triggered the
> > > > > downstream operators using a signal once? Do you clear state of the
> > > > > operators once a new signal is generated? Do you generate a new
> DAGRun?
> > > > >
> > > > > How do you evaluate the model several times while keeping a
> history of
> > > > > operator execution?
> > > > >
> > > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which
> > > > > basically describe something similar to what you are proposing in
> > > > > this AIP:
> > > > > https://github.com/tensorflow/community/pull/253 (interesting
> read as
> > > > > it’s also related to the ML field).
> > > > >
> > > > > My main concern would be that you may need to change a few more
> > > > > things to
> > > > > support signal based triggers, as the relationship between upstream
> > > tasks
> > > > > to downstream tasks is no longer 1:1 but 1:many.
> > > > >
> > > > > Gerard Casas Saez
> > > > > Twitter | Cortex | @casassaez
> > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>,
> wrote:
> > > > > > Hello everyone,
> > > > > >
> > > > > > Sending a message to everyone and collecting feedback on the
> AIP-35 on
> > > > > > adding signal-based scheduling. This was previously briefly
> > > > > mentioned in
> > > > > > the discussion of development slack channel. The key motivation
> of
> > > this
> > > > > > proposal is to support a mixture of batch and stream jobs in the
> same
> > > > > > workflow.
> > > > > >
> > > > > > In practice, there are many business logics that need
> collaboration
> > > > > between
> > > > > > streaming and batch jobs. For example, in machine learning, an
> online
> > > > > > learning job is a streaming job running forever. It emits a model
> > > > > > periodically and a batch evaluate job should run to validate each
> > > model.
> > > > > So
> > > > > > this is a streaming-triggered-batch case. If we continue from the
> > > above
> > > > > > example, once the model passes the validation, the model needs
> to be
> > > > > > deployed to a serving job. That serving job could be a streaming
> > > > > job that
> > > > > > keeps polling records from Kafka and uses a model to do
> prediction.
> > > And
> > > > > the
> > > > > > availability of a new model requires that streaming prediction
> job
> > > either
> > > > > > to restart, or to reload the new model on the fly. In either
> case,
> > > > > it is
> > > > > a
> > > > > > batch-to-streaming job triggering.
> > > > > >
> > > > > > At this point above, people basically have to invent their own
> way to
> > > > > deal
> > > > > > with such workflows consisting of both batch and streaming jobs.
> I
> > > think
> > > > > > having a system that can help smoothly work with a mixture of
> > > streaming
> > > > > and
> > > > > > batch jobs is valuable here.
> > > > > >
> > > > > > This AIP-35 focuses on signal-based scheduling to let the
> operators
> > > send
> > > > > > signals to the scheduler to trigger a scheduling action, such as
> > > starting
> > > > > > jobs, stopping jobs and restarting jobs. With the change of
> > > > > > scheduling mechanism, a streaming job can send signals to the
> > > scheduler
> > > > > to
> > > > > > indicate the state change of the dependencies.
> > > > > >
> > > > > > Signal-based scheduling allows the scheduler to know the change
> of the
> > > > > > dependency state immediately without periodically querying the
> > > metadata
> > > > > > database. This also allows potential support for richer
> scheduling
> > > > > > semantics such as periodic execution and manual trigger at per
> > > operator
> > > > > > granularity.
> > > > > >
> > > > > > Changes proposed:
> > > > > >
> > > > > > - Signals are used to define conditions that must be met to run
> an
> > > > > > operator. State change of the upstream tasks is one type of the
> > > signals.
> > > > > > There may be other types of signals. The scheduler may take
> different
> > > > > > actions when receiving different signals. To let the operators
> take
> > > > > signals
> > > > > > as their starting condition, we propose to introduce
> SignalOperator
> > > which
> > > > > > is mentioned in the public interface section.
> > > > > > - A notification service is necessary to help receive and
> propagate
> > > the
> > > > > > signals from the operators and other sources to the scheduler.
> Upon
> > > > > > receiving a signal, the scheduler can take action according to
> the
> > > > > > predefined signal-based conditions on the operators. Therefore we
> > > propose
> > > > > > to introduce a Signal Notification Service component to Airflow.
> > > > > >
> > > > > > Please see related documents and PRs for details:
> > > > > >
> > > > > > AIP:
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
> > > > > > Issue: https://github.com/apache/airflow/issues/9321
> > > > > >
> > > > > > Please let me know if there are any aspects that you
> agree/disagree
> > > with
> > > > > or
> > > > > > need more clarification (especially the SignalOperator and
> > > > > SignalService).
> > > > > > Any comments are welcome and I am looking forward to it!
> > > > > >
> > > > > > Thanks,
> > > > > > Nicholas
> > > > >
> > > >
> > >
>

Reply via email to