Hi Gerard,

If users follow the definition of SignalOperator correctly, the idea for
the streaming-triggered-batch case is to restart the execution for
evaluations of the online trained model. In other words, once the
evaluation operator receives the signal from the online learning operator,
the scheduler takes RESTART action to restart the task of the evaluation on
the indefinitely running DAGRun. Upon a new signal is received, the
scheduler checks with the SignalOperators to determine the action of
Operator and carries out that action. Then, this scheduling also updates
the state of the task instance, not clear the operator's state in the
metadata database. At this time, the scheduler cuts out the subgraph from
the DAG and constructs the sub DAG to run.

While keeping a history of operator execution, the scheduling could use
signals and execution history to organize signal conditions of the
operators. Once the operator of the evaluation receives a new signal, the
scheduler checks whether the signal and previous executions are met with
the conditions. If the received signal is met with the corresponding
condition and the execution of task instances also meet the conditions, the
scheduler would take the RESTART action repeatedly for model evaluation.

For supporting signal based triggers, the signal-based scheduling requires
the introduction of SignalOperator for the operators that send signals.
With the SignalOperator, the scheduler could switch to use signals of the
operator and the corresponding condition to decide the action of the
operator. No matter whether the relationship between upstream tasks to
downstream tasks is one-to-one or one-to-many, this scheduling only checks
whether the received signals of the SignalOperator is met with the signal
conditions and concerns about which action to take when these conditions
are met.

By the way, signal-based scheduling needs a few more things to map signals
to corresponding actions.

Regards,
Nicholas Jiang

On Wed, Jun 17, 2020 at 12:00 AM Gerard Casas Saez
<gcasass...@twitter.com.invalid> wrote:

> That looks interesting. My main worry is how to handle multiple executions
> of Signal based operators. If I follow your definition correctly, the idea
> is to run multiple evaluations of the online trained model (on a
> permanently running DAGRun). So what happens when you have triggered the
> downstream operators using a signal once? Do you clear state of the
> operators once a new signal is generated? Do you generate a new DAGRun?
>
> How do you evaluate the model several times while keeping a history of
> operator execution?
>
> Related to this, TFX has a similar proposal for ASYNC DAGs, which
> basically describe something similar to what you are proposing in this AIP:
> https://github.com/tensorflow/community/pull/253 (interesting read as
> it’s also related to the ML field).
>
> My main concern would be that you may need to change a few more things to
> support signal based triggers, as the relationship between upstream tasks
> to downstream tasks is no longer 1:1 but 1:many.
>
> Gerard Casas Saez
> Twitter | Cortex | @casassaez
> On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>, wrote:
> > Hello everyone,
> >
> > Sending a message to everyone and collecting feedback on the AIP-35 on
> > adding signal-based scheduling. This was previously briefly mentioned in
> > the discussion of development slack channel. The key motivation of this
> > proposal is to support a mixture of batch and stream jobs in the same
> > workflow.
> >
> > In practice, there are many business logics that need collaboration
> between
> > streaming and batch jobs. For example, in machine learning, an online
> > learning job is a streaming job running forever. It emits a model
> > periodically and a batch evaluate job should run to validate each model.
> So
> > this is a streaming-triggered-batch case. If we continue from the above
> > example, once the model passes the validation, the model needs to be
> > deployed to a serving job. That serving job could be a streaming job that
> > keeps polling records from Kafka and uses a model to do prediction. And
> the
> > availability of a new model requires that streaming prediction job either
> > to restart, or to reload the new model on the fly. In either case, it is
> a
> > batch-to-streaming job triggering.
> >
> > At this point above, people basically have to invent their own way to
> deal
> > with such workflows consisting of both batch and streaming jobs. I think
> > having a system that can help smoothly work with a mixture of streaming
> and
> > batch jobs is valuable here.
> >
> > This AIP-35 focuses on signal-based scheduling to let the operators send
> > signals to the scheduler to trigger a scheduling action, such as starting
> > jobs, stopping jobs and restarting jobs. With the change of
> > scheduling mechanism, a streaming job can send signals to the scheduler
> to
> > indicate the state change of the dependencies.
> >
> > Signal-based scheduling allows the scheduler to know the change of the
> > dependency state immediately without periodically querying the metadata
> > database. This also allows potential support for richer scheduling
> > semantics such as periodic execution and manual trigger at per operator
> > granularity.
> >
> > Changes proposed:
> >
> > - Signals are used to define conditions that must be met to run an
> > operator. State change of the upstream tasks is one type of the signals.
> > There may be other types of signals. The scheduler may take different
> > actions when receiving different signals. To let the operators take
> signals
> > as their starting condition, we propose to introduce SignalOperator which
> > is mentioned in the public interface section.
> > - A notification service is necessary to help receive and propagate the
> > signals from the operators and other sources to the scheduler. Upon
> > receiving a signal, the scheduler can take action according to the
> > predefined signal-based conditions on the operators. Therefore we propose
> > to introduce a Signal Notification Service component to Airflow.
> >
> > Please see related documents and PRs for details:
> >
> > AIP:
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
> > Issue: https://github.com/apache/airflow/issues/9321
> >
> > Please let me know if there are any aspects that you agree/disagree with
> or
> > need more clarification (especially the SignalOperator and
> SignalService).
> > Any comments are welcome and I am looking forward to it!
> >
> > Thanks,
> > Nicholas
>

Reply via email to