I also agree with Kevin - I'd love to see better streaming support in
Airflow, but I'm not sure this is the way to go about it. Something
about it feels not quite right.

And I'm also not a fan of the name -- perhaps just my background and
dealing with the scheduler and executor code -- but to me a signal is a
Unix signal (SIGTERM, SIGKILL) etc. I'd be happier if this was described
as "Event-driven Scheduling".


To take your example:

> For example, in machine learning, an online
> learning job is a streaming job running forever. It emits a model
> periodically and a batch evaluate job should run to validate each model

This is possible right now in Airflow by making an API request to
trigger a run of the validation DAG.

In terms of your proposed class layout:

    class BaseOperator(SignalOperator, LoggingMixin):

Don't create the extra class, just put new things on BaseOperator.

As Chris said:

> Are you saying that you actually have tasks in Airflow that are
> intended to run indefinitely?

1. How is Airflow meant to handle that task? Right now it runs it _and
waits for completion_. It's not clear you've actually thought through
everything this change would involve at the code level -- specifically
in the Executors.

2. Doesn't this require custom code inside the streaming job to report
events back to Airflow?

  I am very uneasy about _needing_ to make changes in the "task code" to
support running it under Airflow.

  One of the big plus points to me is that right now with Airflow can
run whatever tasks or jobs you like, without your job code needing to
know or care about Airflow. This approach changes that. And this is not
a decision to be made lightly.

3. How do we notice if that long running task dies without a trace --
i.e. if it crashes hard/the Kuberenetes node it is on dies, etc and
never signals that it errored how does Airflow know?

4. You're new key component of th Signal Notification Service is not
clearly defined.

  Where does it store things?

  Does it need persistent storage, or is in-memory good enough?

  Does it need at-most-once delivery semantics? At-least-once delivery?

  Does the scheduler pull from it, or does signal service push to the scheduler.

  What happens when there are more than one scheduler running (AIP-15,
which I am working on right now)

  How do we run more than one notification service component (i.e. HA --
once the scheduler is HA we can't have any other new components being a
single point of failure)

  Do signals get delivered to just one scheduler, or all schedulers?

5. Does this now mean to get started out with Airflow you need to run
three components: webserver, scheduler and signal service?

  (Although the performance on a laptop doesn't matter _too_ much I am
concerned that we keep the "getting started" experience as easy as
possible for new users)

In short I'd like to see _a lot_ more detail on this proposal. It's
proposing a fundamental change to Airflow and there are still lots of
things not covered in enough detail.

I'm also not sure it's even needed for your example workflow. For
instance one way of dealing with it right now would be:

- A task to start a streaming job. This is essentially fire-and-forget.
- A dag with schedule_interval=None that is "manually" invoked (by API
request from within your streaming code) with these two tasks:
     
      model build task >> model validation task >> streaming "restart" task

  i.e. this manually triggerd dag does the job of your signals proposal
if I've understood your example correctly. I'm not saying this is a
_good_ way of doing it, just A way. Have I correctly understood your example?


-ash



On Jun 17 2020, at 10:21 am, 蒋晓峰 <thanosxnicho...@gmail.com> wrote:

> Hi Gerard,
>  
> If users follow the definition of SignalOperator correctly, the idea for
> the streaming-triggered-batch case is to restart the execution for
> evaluations of the online trained model. In other words, once the
> evaluation operator receives the signal from the online learning operator,
> the scheduler takes RESTART action to restart the task of the
> evaluation on
> the indefinitely running DAGRun. Upon a new signal is received, the
> scheduler checks with the SignalOperators to determine the action of
> Operator and carries out that action. Then, this scheduling also updates
> the state of the task instance, not clear the operator's state in the
> metadata database. At this time, the scheduler cuts out the subgraph from
> the DAG and constructs the sub DAG to run.
>  
> While keeping a history of operator execution, the scheduling could use
> signals and execution history to organize signal conditions of the
> operators. Once the operator of the evaluation receives a new signal, the
> scheduler checks whether the signal and previous executions are met with
> the conditions. If the received signal is met with the corresponding
> condition and the execution of task instances also meet the
> conditions, the
> scheduler would take the RESTART action repeatedly for model evaluation.
>  
> For supporting signal based triggers, the signal-based scheduling requires
> the introduction of SignalOperator for the operators that send signals.
> With the SignalOperator, the scheduler could switch to use signals of the
> operator and the corresponding condition to decide the action of the
> operator. No matter whether the relationship between upstream tasks to
> downstream tasks is one-to-one or one-to-many, this scheduling only checks
> whether the received signals of the SignalOperator is met with the signal
> conditions and concerns about which action to take when these conditions
> are met.
>  
> By the way, signal-based scheduling needs a few more things to map signals
> to corresponding actions.
>  
> Regards,
> Nicholas Jiang
>  
> On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez
> <gcasass...@twitter.com.invalid> wrote:
>  
>> That looks interesting. My main worry is how to handle multiple executions
>> of Signal based operators. If I follow your definition correctly, the idea
>> is to run multiple evaluations of the online trained model (on a
>> permanently running DAGRun). So what happens when you have triggered the
>> downstream operators using a signal once? Do you clear state of the
>> operators once a new signal is generated? Do you generate a new DAGRun?
>>  
>> How do you evaluate the model several times while keeping a history of
>> operator execution?
>>  
>> Related to this, TFX has a similar proposal for ASYNC DAGs, which
>> basically describe something similar to what you are proposing in
>> this AIP:
>> https://github.com/tensorflow/community/pull/253 (interesting read as
>> it’s also related to the ML field).
>>  
>> My main concern would be that you may need to change a few more
>> things to
>> support signal based triggers, as the relationship between upstream tasks
>> to downstream tasks is no longer 1:1 but 1:many.
>>  
>> Gerard Casas Saez
>> Twitter | Cortex | @casassaez
>> On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, wrote:
>> > Hello everyone,
>> >
>> > Sending a message to everyone and collecting feedback on the AIP-35 on
>> > adding signal-based scheduling. This was previously briefly
>> mentioned in
>> > the discussion of development slack channel. The key motivation of this
>> > proposal is to support a mixture of batch and stream jobs in the same
>> > workflow.
>> >
>> > In practice, there are many business logics that need collaboration
>> between
>> > streaming and batch jobs. For example, in machine learning, an online
>> > learning job is a streaming job running forever. It emits a model
>> > periodically and a batch evaluate job should run to validate each model.
>> So
>> > this is a streaming-triggered-batch case. If we continue from the above
>> > example, once the model passes the validation, the model needs to be
>> > deployed to a serving job. That serving job could be a streaming
>> job that
>> > keeps polling records from Kafka and uses a model to do prediction. And
>> the
>> > availability of a new model requires that streaming prediction job either
>> > to restart, or to reload the new model on the fly. In either case,
>> it is
>> a
>> > batch-to-streaming job triggering.
>> >
>> > At this point above, people basically have to invent their own way to
>> deal
>> > with such workflows consisting of both batch and streaming jobs. I think
>> > having a system that can help smoothly work with a mixture of streaming
>> and
>> > batch jobs is valuable here.
>> >
>> > This AIP-35 focuses on signal-based scheduling to let the operators send
>> > signals to the scheduler to trigger a scheduling action, such as starting
>> > jobs, stopping jobs and restarting jobs. With the change of
>> > scheduling mechanism, a streaming job can send signals to the scheduler
>> to
>> > indicate the state change of the dependencies.
>> >
>> > Signal-based scheduling allows the scheduler to know the change of the
>> > dependency state immediately without periodically querying the metadata
>> > database. This also allows potential support for richer scheduling
>> > semantics such as periodic execution and manual trigger at per operator
>> > granularity.
>> >
>> > Changes proposed:
>> >
>> > - Signals are used to define conditions that must be met to run an
>> > operator. State change of the upstream tasks is one type of the signals.
>> > There may be other types of signals. The scheduler may take different
>> > actions when receiving different signals. To let the operators take
>> signals
>> > as their starting condition, we propose to introduce SignalOperator which
>> > is mentioned in the public interface section.
>> > - A notification service is necessary to help receive and propagate the
>> > signals from the operators and other sources to the scheduler. Upon
>> > receiving a signal, the scheduler can take action according to the
>> > predefined signal-based conditions on the operators. Therefore we propose
>> > to introduce a Signal Notification Service component to Airflow.
>> >
>> > Please see related documents and PRs for details:
>> >
>> > AIP:
>> >
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
>> > Issue: https://github.com/apache/airflow/issues/9321
>> >
>> > Please let me know if there are any aspects that you agree/disagree with
>> or
>> > need more clarification (especially the SignalOperator and
>> SignalService).
>> > Any comments are welcome and I am looking forward to it!
>> >
>> > Thanks,
>> > Nicholas
>>  
>

Reply via email to