So far Airflow 2.0 has been a moving target and it has no fixed timeline.

This is one of the main reasons why I think we should all in the
community pretty much focus on making sure we deliver 2.0 in it's currently
planned "shape". From the past (and my own experience - I am one of the
"guilty" ones) I know that discussions about new features might get the
community fairly distracted from that main focus. Especially the people who
are the most active, they (myself including) tend to feel obliged to make
comments and go into details of such discussions and since those people
tend to know a lot of ins- and outs- of Airflow and their input and time is
valuable.

And while - obviously - we should not stop thinking of new features, I
believe it makes more sense to mark all such issues as 2.1 by default -
having prototypes and some initial discussions are fine, but with
expectations that more discussions and possibly changes to the whole
approach and maybe even rejection of the idea after deeper discussion will
happen post 2.0.

Explicitly targeting it past 2.0 might simply set the right "expectation"
from everyone involved.

Again - this is just my point of view, and I would love to see what others
think about it.

J.


On Tue, Jun 23, 2020 at 11:07 AM Becket Qin <becket....@gmail.com> wrote:

> Hi Jarek,
>
> Thanks for the reply. I saw the AirFlow 2.0 roadmap[1] and I am wondering
> if there is a timeline for it?
>
> Also, do you see any potential conflict with some existing efforts on
> AirFlow 2.0? We don't want to interrupt the ongoing development pace. And
> we are happy to first implement a prototype and test it with our scenarios.
> Hopefully the implementation is mostly incremental and won't lead to
> patches hard to merge. But we would like to avoid moving into a direction
> different from where the community is heading to. So some advice would be
> appreciated.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> [1] https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+2.0
>
>
>
> On Mon, Jun 22, 2020 at 5:08 AM Jarek Potiuk <jarek.pot...@polidea.com>
> wrote:
>
> > I think this is a good direction in general for Airflow. I saw a number
> of
> > usages that goes beyond the "original" use of Airflow and are kind of
> > contrary to the "Airflow is not a streaming solution" statement from the
> > Airflow main web page. And I think it's crucial for Airflow to stay
> > relevant in the future.
> >
> > But maybe - just maybe - it's a good idea that we all (including all the
> > committers) try to focus on what we are supposed to deliver for 2.0. And
> > defer those kinds of changes to 2.1?
> >
> > I think we have quite a number of things to implement still to complete
> the
> > 2.0 release, and maybe simply focusing to deliver those things we
> promised
> > and let people start moving to 2.0 as soon as possible is a good thing?
> > Otherwise 2.0 seems like a moving target that continues to be further and
> > further in the future.
> >
> > J.
> >
> >
> >
> >
> > On Fri, Jun 19, 2020 at 5:13 PM Becket Qin <becket....@gmail.com> wrote:
> >
> > > Hi Gerard and Dan,
> > >
> > > Thanks for the comments and suggestions. They are very helpful.
> > >
> > > Please see the replies below:
> > >
> > > *Re: Gerard*
> > >
> > > > • I agree on calling it event based scheduling, may be less
> misleading.
> > >
> > > Event-based scheduling it is. I'll change the term from now on.
> > >
> > > In my opinion, I think the best way would be to extend BaseOperator to
> > > > incorporate signals as suggested before. Adding a new operator seems
> it
> > > > will make things more complex for the end user.
> > >
> > > Sure, either way is fine with me. The introduction of SignalOperator
> was
> > > from the assumption that we will deprecate BaseOperator after
> > > completely switching to event-based scheduling. Thinking about this a
> bit
> > > more, I am unsure whether we need the BaseOperator to inherit
> > > SignalOperator. It might be better to have something like a
> > > BaseOperatorWrapper which is a SignalOperator that wraps a
> BaseOperator.
> > >
> > > For the existing operators that inherit BaseOperator,
> BaseOperatorWrapper
> > > has the on_event() method implemented to mimic the same behavior as
> > > dependency based scheduling. So the existing operators will just work
> > > without any change. For new operator implementations, if the authors
> > choose
> > > to not use events at all, they may just implement the BaseOperator.
> > > Otherwise, they can implement SignalOperator. In either case, the
> actual
> > > scheduling will just be event-based.
> > >
> > > • We should allow user to configure if they want to use Event based
> > > > scheduling (async/signal) or operator dependency scheduling
> > > > (sync/scheduler). This would allow users to choose the mode they want
> > to
> > > > run their DAG, and potentially also change the execution UI for the
> > DAG.
> > >
> > > I think event-based scheduling supports operator dependency scheduling.
> > It
> > > is just a different implementation. And with that implementation, in
> > > addition to operator dependency scheduling, we can trigger scheduling
> > using
> > > something other than the operator dependency.
> > >
> > > • Still concerned on adding logic to issue signals in user code.
> > >
> > > Can you elaborate a little bit on the concerns? Is it about exposing
> the
> > > scheduling control to users without a pre-defined order? Or is it
> > something
> > > else?
> > >
> > > • We should also consider how would we implement a use case where its
> not
> > > > just a long running task, but just several tasks that run in
> different
> > > > schedules and that need a common downstream tasks to be run after
> > either
> > > of
> > > > them finishes. (example: train a new model every time new examples
> are
> > > > available or new embeddings are available). This is why I think a
> more
> > > high
> > > > level design could benefit the proposal. Aka async DAG execution.
> > >
> > > This is exactly one of the use cases we would like to support with
> > > event-based scheduling. In the current proposal, this would be achieve
> as
> > > following:
> > >
> > > 1. The training operator will have two EventKeys as its interested
> > events:
> > > "NewExample" and "NewEmbedding".
> > > 2. The scheduler retrieves the interested event keys of each operator
> in
> > > the DAG and keeps an EventKey -> InterestedOperatorList mapping. In
> this
> > > case, the training operator will appear in both operator lists of
> > > "NewExample" and "NewEmbedding".
> > > 3. When one of those two events comes, the scheduler checks the
> EventKey
> > > and calls Operator#on_events(Event) method on each of the operators in
> > > InterestedOperatorList to get an action that needs to be taken on that
> > > operator. In this case, the operator in question would return START
> when
> > it
> > > sees either "NewExample" or "NewEmbedding".
> > >
> > > • UI is going to be affected by the described clear task instance you
> > > > describe. As a user I will only be able to see the latest task
> instance
> > > and
> > > > that’s no good. Web UI will need to be changed to accommodate this
> and
> > we
> > > > will need to find a way to have multiple task instances of a single
> > > task_id
> > > > + lineage visualization for understanding what event/task instance
> > > > generated what execution of a task instance.
> > >
> > > Yes, you are right. UI also needs to change so it will show the entire
> > > execution history of an operator of a task in this case. That should
> > > probably also be included in the AIP wiki.
> > >
> > >
> > > *Re: Dan*
> > > The Google doc we were referring to is here.
> > >
> > >
> >
> https://docs.google.com/document/d/1Wps5iaGONFdDYgQIR4uTQ9Q4fY6hWatS1crsuLmacys/edit?ts=5ee277bc#heading=h.q7bq5z8z3jqs
> > >
> > > 1. Regarding the scope of the PR.
> > >
> > > Supporting streaming jobs is actually an important motivation of the
> > > proposal. It is true that airflow was not designed to handle long
> running
> > > jobs. In fact, we looked around and did not find any workflow tooling
> > that
> > > supports streaming jobs. And that is one of the reasons that we created
> > > this AIP.
> > >
> > > Although this AIP seemingly moves AirFlow a little towards service
> > > orchestration by adding support for the long running jobs, there is
> > still a
> > > quite clear boundary between AirFlow and service orchestration. A
> service
> > > orchestration does much more than a workflow, it usually has to deal
> with
> > > stuff like resource management, environment setup, auto scaling and
> cross
> > > workflow coordinations, etc. This AIP only proposes another mechanism
> to
> > > deploy a single workflow. It does not even introduce semantics for
> cross
> > > workflow communication. That being said, it is true that with this
> > change,
> > > it would be easier to integrate AirFlow into a service orchestration
> > tool.
> > >
> > > 2. About the interaction between signals/events and existing AirFlow
> > > abstractions.
> > >
> > > > 2. Have we thought through the interactions between signals and
> > existing
> > > > Airflow abstractions especially with respect to edge cases, e.g.
> > handling
> > > > of missed signals, duplicated signals, zombie signals that no longer
> > > exist,
> > > > interactions of STOP and RESTART with task retries, interactions of
> > > signal
> > > > actions (e.g. start task) and scheduler actions (e.g. start task due
> to
> > > > cron schedule), how signals will work with priority mechanisms like
> > task
> > > > priorities/non_pooled_task_slot_count etc, how each signal will
> > interact
> > > > with each task state (e.g. user marks tasks failedi in the UI), or
> will
> > > > that happen in a subsequent doc/pull request?
> > > >
> > >
> > >
> > > In particular I'm curious about the interactions between scheduler
> > polling
> > > > logic that results in task state changes and the new signals system
> as
> > > I'm
> > > > concerned that having both systems will add complexity and we should
> > > tryto
> > > > standardize on one (e.g. instead of a task getting retried by the
> > > scheduler
> > > > if it runs a polling loop and sees that the task received a failed
> > signal
> > > > OR the task getting retried by the scheduler receiving a restart
> task,
> > > > instead we remove the polling logic and replace it with a signal
> > handler
> > > > for tasks that send a FAILED signal). I'm also curious how the REST
> API
> > > > plays into this since it feels like there is some overlap with that
> as
> > > > well.
> > >
> > >
> > > These are great questions. I don't think we have thought all of them
> > > through at this point. But I'll just put some quick ideas here. Please
> > > forgive me if these ideas are too far away from what AirFlow looks like
> > > right now.
> > >
> > >    - handling the missing / duplicate events.
> > >       - On the event sender side, an ack would be necessary to ensure
> an
> > >       event is successfully sent, otherwise a retry can be issued. This
> > > might
> > >       result in duplicates, but is solvable by deduplication.
> > >       - On the event receiver side, given that we are using a pull
> model,
> > >       it is unlikely to miss an event.
> > >    - I am not sure what "zombie events" mean. Can you elaborate? I
> think
> > if
> > >    an event has been sent to the notification service, that event has
> > >    happened, regardless of what happened afterwards.
> > >    - Interaction with task retries.
> > >       - Are the retries defined on each operator? If so, I imagine with
> > >       SignalBasedOperator, the retries will be implemented in the
> > >       SignalOperator#on_event() method. When
> > > operator.on_event(TaskFailedEvent)
> > >       is invoked, the operator simply returns START, and that is
> > > essentially a
> > >       retry.
> > >    - Priority / non_pooled_task_slot_count
> > >       - Would the following basic pattern work with them?
> > >
> > > The basic pattern in the runtime here is following:
> > >
> > >    - *Events* are just facts that passed to the operators.
> > >    - AirFlow provides some A*ctions* the operators can ask AirFlow to
> > take,
> > >    e.g. task start / stop / restart.
> > >    - AirFlow passes the events to the operators, and lets the operators
> > >    decide what actions to take.A
> > >    - At API level, AirFlow has some predefined events -> action
> mappings
> > >    for the operators. e.g. in case of task dependency based scheduling,
> > >    on_event(task_finished) of an operator may be overridden to START,
> > > because
> > >    the action is already defined when users construct the DAG with the
> > API.
> > >    - The REST API requests can be considered as another event and also
> > >    handled by the scheduler.
> > >
> > >
> > > Thanks again for the great feedback and questions.
> > >
> > > Cheers,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Fri, Jun 19, 2020 at 6:08 AM Dan Davydov
> <ddavy...@twitter.com.invalid
> > >
> > > wrote:
> > >
> > > > Metacomment:
> > > > You might want to consider moving this discussion to a google doc or
> > > > something since it seems like a lot of divergent threads are being
> > > created
> > > > due to the scope of this change, which can be a bit hard to keep
> track
> > of
> > > > in email. If folks are concerned about history we can dump the google
> > doc
> > > > contents/comments into the AIP at the end. I see that you mentioned a
> > > > google doc but I didn't see a link so I'm assuming it was some
> private
> > > one.
> > > >
> > > >
> > > > At a high level I think this is a really great idea, and will unlock
> > the
> > > > ability to remove a lot of the bottlenecks of Airflow that are based
> > > around
> > > > the current polling-based task scheduling, and also prevent the need
> > for
> > > > sensor operators in a lot of cases.
> > > >
> > > > 1.
> > > >
> > > > >  I'm in general supportive of this idea of supporting streaming
> jobs.
> > > >
> > > > It's my understanding that the scope of this PR is not to support
> > > streaming
> > > > tasks in Airflow (Nicholas if I'm incorrect let me know since this is
> > an
> > > > important decision). Airflow isn't really designed to work well with
> > > > streaming, typically you use an http operator or something like that
> to
> > > > send signal requests etc. Otherwise AIrflow would need to become kind
> > of
> > > a
> > > > service orchestrator which I think would be outside of the scope of
> > what
> > > > Airflow is designed to do and would be hard to implement in a clean
> way
> > > > using Airflow's current primitives like tasks/dagruns/xcom, though
> it's
> > > > possible I am wrong. I haven't been at Airbnb for a while now, so
> take
> > > this
> > > > with a grain of salt, but the usage of Airflow as a service
> > orchestrator
> > > > IIRC was more of a hack to work around the fact that there was no
> easy
> > > way
> > > > to easily launch lightweight python services in Kubernetes, so
> Airflow
> > > > tasks were used instead.
> > > >
> > > > 2. Have we thought through the interactions between signals and
> > existing
> > > > Airflow abstractions especially with respect to edge cases, e.g.
> > handling
> > > > of missed signals, duplicated signals, zombie signals that no longer
> > > exist,
> > > > interactions of STOP and RESTART with task retries, interactions of
> > > signal
> > > > actions (e.g. start task) and scheduler actions (e.g. start task due
> to
> > > > cron schedule), how signals will work with priority mechanisms like
> > task
> > > > priorities/non_pooled_task_slot_count etc, how each signal will
> > interact
> > > > with each task state (e.g. user marks tasks failedi in the UI), or
> will
> > > > that happen in a subsequent doc/pull request?
> > > >
> > > > In particular I'm curious about the interactions between scheduler
> > > polling
> > > > logic that results in task state changes and the new signals system
> as
> > > I'm
> > > > concerned that having both systems will add complexity and we should
> > > tryto
> > > > standardize on one (e.g. instead of a task getting retried by the
> > > scheduler
> > > > if it runs a polling loop and sees that the task received a failed
> > signal
> > > > OR the task getting retried by the scheduler receiving a restart
> task,
> > > > instead we remove the polling logic and replace it with a signal
> > handler
> > > > for tasks that send a FAILED signal). I'm also curious how the REST
> API
> > > > plays into this since it feels like there is some overlap with that
> as
> > > > well.
> > > >
> > > > Another example of something that should probably be moved to the
> > signal
> > > > system is this part of the AIP:
> > > >
> > > > >   3. Regarding this section in the AIP:
> > > > >     Step 3. Each available worker pulls a TaskInstance from the
> queue
> > > and
> > > > > executes it, sends a signal to  the notification service a signal
> > > > > indicating the task state changes from “queued” to “running”. The
> > > > > scheduling loop will then update the task state in the database
> when
> > > > > receiving this signal..
> > > > >   Shouldn't the DB update logic happen in the schedule signal
> > listener
> > > > > instead of the poll-based scheduling loop?
> > > >
> > > >
> > > >
> > > > On Thu, Jun 18, 2020 at 3:00 PM Gerard Casas Saez
> > > > <gcasass...@twitter.com.invalid> wrote:
> > > >
> > > > > Re-read this a couple times.  Some thoughts:
> > > > >
> > > > > • I agree on calling it event based scheduling, may be less
> > misleading.
> > > > > • I believe we can mostly all agree that this is something
> > interesting
> > > to
> > > > > add to Airflow and a use case that would be good to support.
> Nailing
> > > the
> > > > > details and path to implement is important though. (or at least
> > that’s
> > > my
> > > > > understanding)
> > > > > • In my opinion, I think the best way would be to extend
> BaseOperator
> > > to
> > > > > incorporate signals as suggested before. Adding a new operator
> seems
> > it
> > > > > will make things more complex for the end user.
> > > > >     • We should allow user to configure if they want to use Event
> > based
> > > > > scheduling (async/signal) or operator dependency scheduling
> > > > > (sync/scheduler). This would allow users to choose the mode they
> want
> > > to
> > > > > run their DAG, and potentially also change the execution UI for the
> > > DAG.
> > > > > • Still concerned on adding logic to issue signals in user code.
> > > > >     • An alternative option here is to use XCom/data lineage
> > artifacts
> > > > for
> > > > > signal backend. Aka when an operator A pushes a new value of XCom
> > keyed
> > > > K,
> > > > > execute operator B with that XCom value. Most of the use cases I
> can
> > > > think
> > > > > of for even based scheduling can adapt to this concept where
> > downstream
> > > > > operators need to run whenever a new data artifact is generate by
> an
> > > > > upstream artifact.
> > > > >     • Issue here is we don't have good data artifact representation
> > on
> > > > > Airflow as they usually tend to be implicit (over explicit).
> > Hopefully
> > > w
> > > > > Functional DAGs we can improve this.
> > > > > • We should also consider how would we implement a use case where
> its
> > > not
> > > > > just a long running task, but just several tasks that run in
> > different
> > > > > schedules and that need a common downstream tasks to be run after
> > > either
> > > > of
> > > > > them finishes. (example: train a new model every time new examples
> > are
> > > > > available or new embeddings are available). This is why I think a
> > more
> > > > high
> > > > > level design could benefit the proposal. Aka async DAG execution.
> > > > > • UI is going to be affected by the described clear task instance
> you
> > > > > describe. As a user I will only be able to see the latest task
> > instance
> > > > and
> > > > > that’s no good. Web UI will need to be changed to accommodate this
> > and
> > > we
> > > > > will need to find a way to have multiple task instances of a single
> > > > task_id
> > > > > + lineage visualization for understanding what event/task instance
> > > > > generated what execution of a task instance.
> > > > >
> > > > >
> > > > > Gerard Casas Saez
> > > > > Twitter | Cortex | @casassaez
> > > > > On Jun 17, 2020, 10:22 AM -0600, Becket Qin <becket....@gmail.com
> >,
> > > > wrote:
> > > > > > Hi Kevin and Ash,
> > > > > >
> > > > > > Thanks for the feedback / comments / suggestions. It is great to
> > know
> > > > > that
> > > > > > Airbnb has already been running stream jobs using AirFlow and
> there
> > > > might
> > > > > > be a working solution.
> > > > > >
> > > > > > First of all, I'd like to say that we are still quite new to the
> > > > AirFlow,
> > > > > > our proposal likely has overlooked many things and that's why we
> > > would
> > > > > like
> > > > > > to get advices from the community. So we really appreciate
> > > suggestions
> > > > > and
> > > > > > feedback.
> > > > > >
> > > > > > I can't agree more that we should introduce mechanisms to AirFlow
> > > > instead
> > > > > > of just making it support some specific use cases. The two
> > scenarios
> > > > > > mentioned by Nicholas above are just some examples to explain how
> > > > > > signal-based scheduling enables new possibility. It does not
> > suggest
> > > > that
> > > > > > our goal is just to make those two cases work.
> > > > > >
> > > > > > Just to add some background to the proposal, the original
> > motivation
> > > of
> > > > > > this AIP actually came from our project in Apache Flink community
> > > > called
> > > > > AI
> > > > > > Flow[1][2]. It aims to define a clear API for people to describe
> > > their
> > > > > > machine learning workflows that contain both stream and batch
> jobs.
> > > We
> > > > > have
> > > > > > a basic implementation of signal-based scheduling but would like
> to
> > > > > switch
> > > > > > to AirFlow so we don't re-invent wheels.
> > > > > >
> > > > > > In any case, it is probably useful to clarify a little bit on the
> > > > > rational
> > > > > > behind the proposal. The current AIP-35 proposes two major
> changes:
> > > > > >
> > > > > > *Signal based-scheduling*
> > > > > > The key idea is to expand the conditions to trigger a scheduling
> > > > action,
> > > > > > from job status changes to a more extensible concept of *signal*
> or
> > > > > *event.
> > > > > > *This concept extension brings two benefits:
> > > > > >
> > > > > > 1. Make AirFlow capable of dealing with richer types of workflows
> > > > > > including long running jobs more smoothly.
> > > > > > Although the two cases mentioned above may be supported with
> > AirFlow
> > > as
> > > > > > is, the solutions seem a workaround and are not something
> designed
> > as
> > > > the
> > > > > > first class citizen. It essentially relies on the user code to
> put
> > > > > together
> > > > > > some disjoint workflow submissions to form a complete business
> > logic.
> > > > > > Ideally we would like to see the entire business logic as an
> > integral
> > > > > > workflow.
> > > > > > 2. Allow downstream projects to integrate with AirFlow more
> easily.
> > > > > > As an example, imagine an operator in a workflow which deploys a
> > > model
> > > > > > to an online inference service. In some cases, such model
> > deployment
> > > > also
> > > > > > requires external approvals which is not a part of the workflow.
> > > With a
> > > > > > scheduler taking external signals, it becomes much easier to
> > > integrate
> > > > > the
> > > > > > workflow with internal approval systems.
> > > > > >
> > > > > > *Signal notification service*
> > > > > > Signal notification service is also helpful in two aspects:
> > > > > >
> > > > > > 1. It helps isolate the signal sender and signal receivers.
> > > > > > That means an operator who sends the signal does not need to be
> > aware
> > > > of
> > > > > > who might be listening to this signal and what action have to be
> > > taken.
> > > > > One
> > > > > > signal may have multiple listeners and thus multiple workflow
> > actions
> > > > may
> > > > > > be triggered.
> > > > > > 2. It allows pluggability and again integration with existing
> > > systems.
> > > > > > For example, some users may decide to have a Kafka based
> > notification
> > > > > > service.
> > > > > >
> > > > > > We do realize that this proposal is a significant change to
> AirFlow
> > > and
> > > > > > expect taking some time to sort everything out. I am wondering if
> > > doing
> > > > > the
> > > > > > following in order would help the discussion.
> > > > > >
> > > > > > 1. First reach consensus on whether this is worth doing or not.
> > > > > > 2. If we think it is worth doing, what would be the ideal final
> > > state.
> > > > > > 3. Come up with the implementation plan and migration path from
> the
> > > > > current
> > > > > > state to the ideal final state.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Re: Ash
> > > > > > About some of the questions and suggestions.
> > > > > >
> > > > > > And I'm also not a fan of the name -- perhaps just my background
> > and
> > > > > > > dealing with the scheduler and executor code -- but to me a
> > signal
> > > > is a
> > > > > > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was
> > > > > described
> > > > > > > as "Event-driven Scheduling".
> > > > > >
> > > > > >
> > > > > > I don't have a strong opinion on the naming. We picked "signal"
> > > instead
> > > > > of
> > > > > > "event" simply because that sounds more relevant to "control".
> But
> > I
> > > > > think
> > > > > > "event" is also a suitable term and it is also intuitive enough.
> > > > > >
> > > > > > In terms of your proposed class layout:
> > > > > > > class BaseOperator(SignalOperator, LoggingMixin):
> > > > > > > Don't create the extra class, just put new things on
> > BaseOperator.
> > > > > >
> > > > > >
> > > > > > Tao raised a similar question in the google doc. I agree that it
> > > would
> > > > > work
> > > > > > by putting new things on the BaseOperator. I think one reason of
> > > > having a
> > > > > > separate SignalOperator class is that theoretically speaking, the
> > > > > scheduler
> > > > > > only needs to deal with the SignalOperator interface. If so, we
> may
> > > > want
> > > > > to
> > > > > > eventually migrate from the current BaseOperator to
> SignalOperator.
> > > The
> > > > > job
> > > > > > status change will just become one type of the signals. So the
> new
> > > > > > operators might be able to just implement a SignalOperator
> instead
> > of
> > > > > > BaseOperator. Having a separate SignalOperator would make it
> easier
> > > to
> > > > > > eventually converge to SignalOperator and deprecate the current
> > > > > > BaseOperator.
> > > > > >
> > > > > > Doesn't this require custom code inside the streaming job to
> report
> > > > > > > events back to Airflow?
> > > > > >
> > > > > >
> > > > > > For the job status change events / signals, users don't have to
> do
> > > > > > anything. It is just currently the job status are communicated
> back
> > > the
> > > > > > scheduler via Database, now it is going to be via the signal
> > > > notification
> > > > > > service.
> > > > > > For the signals that is not job status related, user code needs
> to
> > > > send a
> > > > > > signal / event at some point.
> > > > > >
> > > > > > 3. How do we notice if that long running task dies without a
> trace
> > --
> > > > > > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc
> > and
> > > > > > > never signals that it errored how does Airflow know?
> > > > > >
> > > > > >
> > > > > > In our basic implementation, we have a job status listener
> > component
> > > > that
> > > > > > can work with K8S / Yarn / Other cloud platforms, to keep track
> of
> > > the
> > > > > job
> > > > > > status change. And the job status change signals are actually
> sent
> > by
> > > > it.
> > > > > > Does AirFlow right now also listen to the status of the jobs
> > running
> > > in
> > > > > > K8S? Can you help us understand a bit more on the current failure
> > > > > handling
> > > > > > model of AirFlow?
> > > > > >
> > > > > > 4. You're new key component of th Signal Notification Service is
> > not
> > > > > > > clearly defined.
> > > > > > > Where does it store things?
> > > > > > > Does it need persistent storage, or is in-memory good enough?
> > > > > > > Does it need at-most-once delivery semantics? At-least-once
> > > delivery?
> > > > > > > Does the scheduler pull from it, or does signal service push to
> > the
> > > > > > > scheduler.
> > > > > > > What happens when there are more than one scheduler running
> > > (AIP-15,
> > > > > > > which I am working on right now)
> > > > > > > How do we run more than one notification service component
> (i.e.
> > HA
> > > > --
> > > > > > > once the scheduler is HA we can't have any other new components
> > > > being a
> > > > > > > single point of failure)
> > > > > > > Do signals get delivered to just one scheduler, or all
> > schedulers?
> > > > > >
> > > > > >
> > > > > > Sorry about the confusion. It is true that a bunch of details are
> > > > missed
> > > > > > here. We will add them into the wiki. To answer the questions
> > > quickly:
> > > > > >
> > > > > > - In our basic implementation, default signal notification
> service
> > > > > > stores the signal in my sql.
> > > > > > - The notification service has a REST API works in a long-pull
> > > manner,
> > > > > > i.e. a client sends an http request with the last version of the
> > keys
> > > > it
> > > > > > has seen, and that request parking waits on the server side until
> > one
> > > > of
> > > > > > the following is met:
> > > > > > - Request timeout is reached, in which case an empty response is
> > > > > > returned.
> > > > > > - A new version of the interested signal is available. The
> > > > > > notification service returns all the SignalVersions newer than
> > > > > > the version
> > > > > > last seen by the client.
> > > > > > - The scheduler pulls from the notification service, but in a
> > > long-pull
> > > > > > manner.
> > > > > > - Off the top of my head, the notification service probably just
> > > needs
> > > > > > one instance which supports multiple schedulers. It could have
> more
> > > > > > instances for scalability.
> > > > > > - Technically speaking the notification service frontend itself
> is
> > > > > > stateless. So failure recovery is simple. So if the database of
> the
> > > > > > notification service has HA. It should be sufficient.
> > > > > > - Signals will be delivered to all the schedulers that are
> > listening
> > > to
> > > > > > that signal.
> > > > > >
> > > > > > 5. Does this now mean to get started out with Airflow you need to
> > run
> > > > > > > three components: webserver, scheduler and signal service?
> > > > > >
> > > > > >
> > > > > > Not sure if there is a problem, but I imagine for a local
> > > deployment, a
> > > > > > signal service could be a part of the webserver, just with one
> more
> > > > REST
> > > > > > endpoint.
> > > > > >
> > > > > > Thanks again for all the great feedback and looking forward to
> your
> > > > > > suggestions.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > [1] https://www.youtube.com/watch?v=xiYJTCj2zUU
> > > > > > [2]
> > > > >
> > https://www.slideshare.net/JiangjieQin/ai-made-easy-with-flink-ai-flow
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 17, 2020 at 6:16 PM Ash Berlin-Taylor <
> a...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > I also agree with Kevin - I'd love to see better streaming
> > support
> > > in
> > > > > > > Airflow, but I'm not sure this is the way to go about it.
> > Something
> > > > > > > about it feels not quite right.
> > > > > > >
> > > > > > > And I'm also not a fan of the name -- perhaps just my
> background
> > > and
> > > > > > > dealing with the scheduler and executor code -- but to me a
> > signal
> > > > is a
> > > > > > > Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was
> > > > > described
> > > > > > > as "Event-driven Scheduling".
> > > > > > >
> > > > > > >
> > > > > > > To take your example:
> > > > > > >
> > > > > > > > For example, in machine learning, an online
> > > > > > > > learning job is a streaming job running forever. It emits a
> > model
> > > > > > > > periodically and a batch evaluate job should run to validate
> > each
> > > > > model
> > > > > > >
> > > > > > > This is possible right now in Airflow by making an API request
> to
> > > > > > > trigger a run of the validation DAG.
> > > > > > >
> > > > > > > In terms of your proposed class layout:
> > > > > > >
> > > > > > > class BaseOperator(SignalOperator, LoggingMixin):
> > > > > > >
> > > > > > > Don't create the extra class, just put new things on
> > BaseOperator.
> > > > > > >
> > > > > > > As Chris said:
> > > > > > >
> > > > > > > > Are you saying that you actually have tasks in Airflow that
> are
> > > > > > > > intended to run indefinitely?
> > > > > > >
> > > > > > > 1. How is Airflow meant to handle that task? Right now it runs
> it
> > > > _and
> > > > > > > waits for completion_. It's not clear you've actually thought
> > > through
> > > > > > > everything this change would involve at the code level --
> > > > specifically
> > > > > > > in the Executors.
> > > > > > >
> > > > > > > 2. Doesn't this require custom code inside the streaming job to
> > > > report
> > > > > > > events back to Airflow?
> > > > > > >
> > > > > > > I am very uneasy about _needing_ to make changes in the "task
> > code"
> > > > to
> > > > > > > support running it under Airflow.
> > > > > > >
> > > > > > > One of the big plus points to me is that right now with Airflow
> > can
> > > > > > > run whatever tasks or jobs you like, without your job code
> > needing
> > > to
> > > > > > > know or care about Airflow. This approach changes that. And
> this
> > is
> > > > not
> > > > > > > a decision to be made lightly.
> > > > > > >
> > > > > > > 3. How do we notice if that long running task dies without a
> > trace
> > > --
> > > > > > > i.e. if it crashes hard/the Kuberenetes node it is on dies, etc
> > and
> > > > > > > never signals that it errored how does Airflow know?
> > > > > > >
> > > > > > > 4. You're new key component of th Signal Notification Service
> is
> > > not
> > > > > > > clearly defined.
> > > > > > >
> > > > > > > Where does it store things?
> > > > > > >
> > > > > > > Does it need persistent storage, or is in-memory good enough?
> > > > > > >
> > > > > > > Does it need at-most-once delivery semantics? At-least-once
> > > delivery?
> > > > > > >
> > > > > > > Does the scheduler pull from it, or does signal service push to
> > the
> > > > > > > scheduler.
> > > > > > >
> > > > > > > What happens when there are more than one scheduler running
> > > (AIP-15,
> > > > > > > which I am working on right now)
> > > > > > >
> > > > > > > How do we run more than one notification service component
> (i.e.
> > HA
> > > > --
> > > > > > > once the scheduler is HA we can't have any other new components
> > > > being a
> > > > > > > single point of failure)
> > > > > > >
> > > > > > > Do signals get delivered to just one scheduler, or all
> > schedulers?
> > > > > > >
> > > > > > > 5. Does this now mean to get started out with Airflow you need
> to
> > > run
> > > > > > > three components: webserver, scheduler and signal service?
> > > > > > >
> > > > > > > (Although the performance on a laptop doesn't matter _too_
> much I
> > > am
> > > > > > > concerned that we keep the "getting started" experience as easy
> > as
> > > > > > > possible for new users)
> > > > > > >
> > > > > > > In short I'd like to see _a lot_ more detail on this proposal.
> > It's
> > > > > > > proposing a fundamental change to Airflow and there are still
> > lots
> > > of
> > > > > > > things not covered in enough detail.
> > > > > > >
> > > > > > > I'm also not sure it's even needed for your example workflow.
> For
> > > > > > > instance one way of dealing with it right now would be:
> > > > > > >
> > > > > > > - A task to start a streaming job. This is essentially
> > > > fire-and-forget.
> > > > > > > - A dag with schedule_interval=None that is "manually" invoked
> > (by
> > > > API
> > > > > > > request from within your streaming code) with these two tasks:
> > > > > > >
> > > > > > > model build task >> model validation task >> streaming
> "restart"
> > > task
> > > > > > >
> > > > > > > i.e. this manually triggerd dag does the job of your signals
> > > proposal
> > > > > > > if I've understood your example correctly. I'm not saying this
> > is a
> > > > > > > _good_ way of doing it, just A way. Have I correctly understood
> > > your
> > > > > > > example?
> > > > > > >
> > > > > > >
> > > > > > > -ash
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Jun 17 2020, at 10:21 am, 蒋晓峰 <thanosxnicho...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Hi Gerard,
> > > > > > > >
> > > > > > > > If users follow the definition of SignalOperator correctly,
> the
> > > > idea
> > > > > for
> > > > > > > > the streaming-triggered-batch case is to restart the
> execution
> > > for
> > > > > > > > evaluations of the online trained model. In other words, once
> > the
> > > > > > > > evaluation operator receives the signal from the online
> > learning
> > > > > > > operator,
> > > > > > > > the scheduler takes RESTART action to restart the task of the
> > > > > > > > evaluation on
> > > > > > > > the indefinitely running DAGRun. Upon a new signal is
> received,
> > > the
> > > > > > > > scheduler checks with the SignalOperators to determine the
> > action
> > > > of
> > > > > > > > Operator and carries out that action. Then, this scheduling
> > also
> > > > > updates
> > > > > > > > the state of the task instance, not clear the operator's
> state
> > in
> > > > the
> > > > > > > > metadata database. At this time, the scheduler cuts out the
> > > > subgraph
> > > > > from
> > > > > > > > the DAG and constructs the sub DAG to run.
> > > > > > > >
> > > > > > > > While keeping a history of operator execution, the scheduling
> > > could
> > > > > use
> > > > > > > > signals and execution history to organize signal conditions
> of
> > > the
> > > > > > > > operators. Once the operator of the evaluation receives a new
> > > > > signal, the
> > > > > > > > scheduler checks whether the signal and previous executions
> are
> > > met
> > > > > with
> > > > > > > > the conditions. If the received signal is met with the
> > > > corresponding
> > > > > > > > condition and the execution of task instances also meet the
> > > > > > > > conditions, the
> > > > > > > > scheduler would take the RESTART action repeatedly for model
> > > > > evaluation.
> > > > > > > >
> > > > > > > > For supporting signal based triggers, the signal-based
> > scheduling
> > > > > > > requires
> > > > > > > > the introduction of SignalOperator for the operators that
> send
> > > > > signals.
> > > > > > > > With the SignalOperator, the scheduler could switch to use
> > > signals
> > > > > of the
> > > > > > > > operator and the corresponding condition to decide the action
> > of
> > > > the
> > > > > > > > operator. No matter whether the relationship between upstream
> > > tasks
> > > > > to
> > > > > > > > downstream tasks is one-to-one or one-to-many, this
> scheduling
> > > only
> > > > > > > checks
> > > > > > > > whether the received signals of the SignalOperator is met
> with
> > > the
> > > > > signal
> > > > > > > > conditions and concerns about which action to take when these
> > > > > conditions
> > > > > > > > are met.
> > > > > > > >
> > > > > > > > By the way, signal-based scheduling needs a few more things
> to
> > > map
> > > > > > > signals
> > > > > > > > to corresponding actions.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Nicholas Jiang
> > > > > > > >
> > > > > > > > On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez
> > > > > > > > <gcasass...@twitter.com.invalid> wrote:
> > > > > > > >
> > > > > > > > > That looks interesting. My main worry is how to handle
> > multiple
> > > > > > > executions
> > > > > > > > > of Signal based operators. If I follow your definition
> > > correctly,
> > > > > the
> > > > > > > idea
> > > > > > > > > is to run multiple evaluations of the online trained model
> > (on
> > > a
> > > > > > > > > permanently running DAGRun). So what happens when you have
> > > > > triggered the
> > > > > > > > > downstream operators using a signal once? Do you clear
> state
> > of
> > > > the
> > > > > > > > > operators once a new signal is generated? Do you generate a
> > new
> > > > > DAGRun?
> > > > > > > > >
> > > > > > > > > How do you evaluate the model several times while keeping a
> > > > > history of
> > > > > > > > > operator execution?
> > > > > > > > >
> > > > > > > > > Related to this, TFX has a similar proposal for ASYNC DAGs,
> > > which
> > > > > > > > > basically describe something similar to what you are
> > proposing
> > > in
> > > > > > > > > this AIP:
> > > > > > > > > https://github.com/tensorflow/community/pull/253
> > (interesting
> > > > > read as
> > > > > > > > > it’s also related to the ML field).
> > > > > > > > >
> > > > > > > > > My main concern would be that you may need to change a few
> > more
> > > > > > > > > things to
> > > > > > > > > support signal based triggers, as the relationship between
> > > > upstream
> > > > > > > tasks
> > > > > > > > > to downstream tasks is no longer 1:1 but 1:many.
> > > > > > > > >
> > > > > > > > > Gerard Casas Saez
> > > > > > > > > Twitter | Cortex | @casassaez
> > > > > > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <
> > thanosxnicho...@gmail.com
> > > >,
> > > > > wrote:
> > > > > > > > > > Hello everyone,
> > > > > > > > > >
> > > > > > > > > > Sending a message to everyone and collecting feedback on
> > the
> > > > > AIP-35 on
> > > > > > > > > > adding signal-based scheduling. This was previously
> briefly
> > > > > > > > > mentioned in
> > > > > > > > > > the discussion of development slack channel. The key
> > > motivation
> > > > > of
> > > > > > > this
> > > > > > > > > > proposal is to support a mixture of batch and stream jobs
> > in
> > > > the
> > > > > same
> > > > > > > > > > workflow.
> > > > > > > > > >
> > > > > > > > > > In practice, there are many business logics that need
> > > > > collaboration
> > > > > > > > > between
> > > > > > > > > > streaming and batch jobs. For example, in machine
> learning,
> > > an
> > > > > online
> > > > > > > > > > learning job is a streaming job running forever. It
> emits a
> > > > model
> > > > > > > > > > periodically and a batch evaluate job should run to
> > validate
> > > > each
> > > > > > > model.
> > > > > > > > > So
> > > > > > > > > > this is a streaming-triggered-batch case. If we continue
> > from
> > > > the
> > > > > > > above
> > > > > > > > > > example, once the model passes the validation, the model
> > > needs
> > > > > to be
> > > > > > > > > > deployed to a serving job. That serving job could be a
> > > > streaming
> > > > > > > > > job that
> > > > > > > > > > keeps polling records from Kafka and uses a model to do
> > > > > prediction.
> > > > > > > And
> > > > > > > > > the
> > > > > > > > > > availability of a new model requires that streaming
> > > prediction
> > > > > job
> > > > > > > either
> > > > > > > > > > to restart, or to reload the new model on the fly. In
> > either
> > > > > case,
> > > > > > > > > it is
> > > > > > > > > a
> > > > > > > > > > batch-to-streaming job triggering.
> > > > > > > > > >
> > > > > > > > > > At this point above, people basically have to invent
> their
> > > own
> > > > > way to
> > > > > > > > > deal
> > > > > > > > > > with such workflows consisting of both batch and
> streaming
> > > > jobs.
> > > > > I
> > > > > > > think
> > > > > > > > > > having a system that can help smoothly work with a
> mixture
> > of
> > > > > > > streaming
> > > > > > > > > and
> > > > > > > > > > batch jobs is valuable here.
> > > > > > > > > >
> > > > > > > > > > This AIP-35 focuses on signal-based scheduling to let the
> > > > > operators
> > > > > > > send
> > > > > > > > > > signals to the scheduler to trigger a scheduling action,
> > such
> > > > as
> > > > > > > starting
> > > > > > > > > > jobs, stopping jobs and restarting jobs. With the change
> of
> > > > > > > > > > scheduling mechanism, a streaming job can send signals to
> > the
> > > > > > > scheduler
> > > > > > > > > to
> > > > > > > > > > indicate the state change of the dependencies.
> > > > > > > > > >
> > > > > > > > > > Signal-based scheduling allows the scheduler to know the
> > > change
> > > > > of the
> > > > > > > > > > dependency state immediately without periodically
> querying
> > > the
> > > > > > > metadata
> > > > > > > > > > database. This also allows potential support for richer
> > > > > scheduling
> > > > > > > > > > semantics such as periodic execution and manual trigger
> at
> > > per
> > > > > > > operator
> > > > > > > > > > granularity.
> > > > > > > > > >
> > > > > > > > > > Changes proposed:
> > > > > > > > > >
> > > > > > > > > > - Signals are used to define conditions that must be met
> to
> > > run
> > > > > an
> > > > > > > > > > operator. State change of the upstream tasks is one type
> of
> > > the
> > > > > > > signals.
> > > > > > > > > > There may be other types of signals. The scheduler may
> take
> > > > > different
> > > > > > > > > > actions when receiving different signals. To let the
> > > operators
> > > > > take
> > > > > > > > > signals
> > > > > > > > > > as their starting condition, we propose to introduce
> > > > > SignalOperator
> > > > > > > which
> > > > > > > > > > is mentioned in the public interface section.
> > > > > > > > > > - A notification service is necessary to help receive and
> > > > > propagate
> > > > > > > the
> > > > > > > > > > signals from the operators and other sources to the
> > > scheduler.
> > > > > Upon
> > > > > > > > > > receiving a signal, the scheduler can take action
> according
> > > to
> > > > > the
> > > > > > > > > > predefined signal-based conditions on the operators.
> > > Therefore
> > > > we
> > > > > > > propose
> > > > > > > > > > to introduce a Signal Notification Service component to
> > > > Airflow.
> > > > > > > > > >
> > > > > > > > > > Please see related documents and PRs for details:
> > > > > > > > > >
> > > > > > > > > > AIP:
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
> > > > > > > > > > Issue: https://github.com/apache/airflow/issues/9321
> > > > > > > > > >
> > > > > > > > > > Please let me know if there are any aspects that you
> > > > > agree/disagree
> > > > > > > with
> > > > > > > > > or
> > > > > > > > > > need more clarification (especially the SignalOperator
> and
> > > > > > > > > SignalService).
> > > > > > > > > > Any comments are welcome and I am looking forward to it!
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Nicholas
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > [image: Polidea] <https://www.polidea.com/>
> >
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Reply via email to