Hi Gerard,

Regarding the question mentioned above, it's a good point. The Operator
currently contains a task_id attribute which is the same as operator id.
Therefore, when a task instance runs multiple times, the task_id would be
the same. So we do need to change something more than what is in the
current proposal.

To solve the problem, we need to change task_id to task_ids attribute for
multiple executions. In signal-based scheduling, multiple executions of
operators need to use task_ids attribute to maintain the task instance list
corresponding to the same DagRun. Once the downstream operator receives the
signal from upstream, the scheduling always creates a new task instance. At
this time, the Operator should append the new task instance id to
the task_ids.

Regards,
Nicholas Jiang

On Wed, Jun 17, 2020 at 2:55 PM Kevin Yang <yrql...@gmail.com> wrote:

> I'm in general supportive of this idea of supporting streaming jobs. We in
> Airbnb have historically ran stream jobs for years on Airflow, with some
> hacks of course. Yes the stream jobs might not be idempotent or so to fit
> in the Airflow paradigm. But I personally would love to see Airflow be
> expanded to support more use cases and be the go to workflow orchestration
> solution for companies.
>
> With that said, I would prefer we carefully build a paved path for stream
> job scheduling in Airflow instead of a solution for one particular use
> case. Besides different scheduling model requirements, stream jobs may need
> more, e.g. much shorter scheduling latency and singleton guarantee. AIP-35
> aims to solve the issue that downstream may need to be kicked off even the
> upstream job is still running. This is indeed a solid problem but my first
> impression is that adding an entire signal notification system with a
> metadata DB and touching the core scheduling logic for it might be a bit
> overkill. Maybe it would be enough if the upstream still sends out a
> signal, and we build a hacky operator that runs together with the upstream
> task but performs no op until its listener got a start signal from the
> upstream task.
>
> Just my two cents with hope to get more ideas on this topic.
>
>
> Cheers,
> Kevin Y
>
> On Tue, Jun 16, 2020 at 11:06 PM 蒋晓峰 <thanosxnicho...@gmail.com> wrote:
>
> > Hi barath,
> >
> >    Could you please provide the permalink of your previous discussion
> about
> > trigger based dag runs?
> >
> >    In my opinion, compared with trigger based dag runs, signal based
> > scheduling provides finer-grained triggers to determine whether to run
> the
> > task instances on the Operator or not. External systems outside of
> Airflow
> > could use SignalService to send signals to the scheduler to indicate the
> > state change of the operator's dependencies. It's indeed useful for
> > external systems outside of Airflow.
> >
> >  Regards,
> >  Nicholas Jiang
> >
> > On Wed, Jun 17, 2020 at 2:41 AM bharath palaksha <bharath...@gmail.com>
> > wrote:
> >
> > > I had a started a similar discussion earlier.
> > > Trigger based dag runs, a sensor instead of a cron expression which
> tells
> > > whether to trigger the dag run or not. This is similar to that.
> > > It is very useful when you have external systems outside of Airflow and
> > > which can't be programmed to use REST API
> > >
> > > On Tue, Jun 16, 2020, 11:08 PM Chris Palmer <ch...@crpalmer.com>
> wrote:
> > >
> > > > Nicholas,
> > > >
> > > > Are you saying that you actually have tasks in Airflow that are
> > intended
> > > to
> > > > run indefinitely?
> > > >
> > > > That in of itself seems to be a huge fundamental departure from many
> of
> > > the
> > > > assumptions built into Airflow.
> > > >
> > > > Chris
> > > >
> > > > On Tue, Jun 16, 2020 at 12:00 PM Gerard Casas Saez
> > > > <gcasass...@twitter.com.invalid> wrote:
> > > >
> > > > > That looks interesting. My main worry is how to handle multiple
> > > > executions
> > > > > of Signal based operators. If I follow your definition correctly,
> the
> > > > idea
> > > > > is to run multiple evaluations of the online trained model (on a
> > > > > permanently running DAGRun). So what happens when you have
> triggered
> > > the
> > > > > downstream operators using a signal once? Do you clear state of the
> > > > > operators once a new signal is generated? Do you generate a new
> > DAGRun?
> > > > >
> > > > > How do you evaluate the model several times while keeping a history
> > of
> > > > > operator execution?
> > > > >
> > > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which
> > > > > basically describe something similar to what you are proposing in
> > this
> > > > AIP:
> > > > > https://github.com/tensorflow/community/pull/253 (interesting read
> > as
> > > > > it’s also related to the ML field).
> > > > >
> > > > > My main concern would be that you may need to change a few more
> > things
> > > to
> > > > > support signal based triggers, as the relationship between upstream
> > > tasks
> > > > > to downstream tasks is no longer 1:1 but 1:many.
> > > > >
> > > > > Gerard Casas Saez
> > > > > Twitter | Cortex | @casassaez
> > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>,
> > > wrote:
> > > > > > Hello everyone,
> > > > > >
> > > > > > Sending a message to everyone and collecting feedback on the
> AIP-35
> > > on
> > > > > > adding signal-based scheduling. This was previously briefly
> > mentioned
> > > > in
> > > > > > the discussion of development slack channel. The key motivation
> of
> > > this
> > > > > > proposal is to support a mixture of batch and stream jobs in the
> > same
> > > > > > workflow.
> > > > > >
> > > > > > In practice, there are many business logics that need
> collaboration
> > > > > between
> > > > > > streaming and batch jobs. For example, in machine learning, an
> > online
> > > > > > learning job is a streaming job running forever. It emits a model
> > > > > > periodically and a batch evaluate job should run to validate each
> > > > model.
> > > > > So
> > > > > > this is a streaming-triggered-batch case. If we continue from the
> > > above
> > > > > > example, once the model passes the validation, the model needs to
> > be
> > > > > > deployed to a serving job. That serving job could be a streaming
> > job
> > > > that
> > > > > > keeps polling records from Kafka and uses a model to do
> prediction.
> > > And
> > > > > the
> > > > > > availability of a new model requires that streaming prediction
> job
> > > > either
> > > > > > to restart, or to reload the new model on the fly. In either
> case,
> > it
> > > > is
> > > > > a
> > > > > > batch-to-streaming job triggering.
> > > > > >
> > > > > > At this point above, people basically have to invent their own
> way
> > to
> > > > > deal
> > > > > > with such workflows consisting of both batch and streaming jobs.
> I
> > > > think
> > > > > > having a system that can help smoothly work with a mixture of
> > > streaming
> > > > > and
> > > > > > batch jobs is valuable here.
> > > > > >
> > > > > > This AIP-35 focuses on signal-based scheduling to let the
> operators
> > > > send
> > > > > > signals to the scheduler to trigger a scheduling action, such as
> > > > starting
> > > > > > jobs, stopping jobs and restarting jobs. With the change of
> > > > > > scheduling mechanism, a streaming job can send signals to the
> > > scheduler
> > > > > to
> > > > > > indicate the state change of the dependencies.
> > > > > >
> > > > > > Signal-based scheduling allows the scheduler to know the change
> of
> > > the
> > > > > > dependency state immediately without periodically querying the
> > > metadata
> > > > > > database. This also allows potential support for richer
> scheduling
> > > > > > semantics such as periodic execution and manual trigger at per
> > > operator
> > > > > > granularity.
> > > > > >
> > > > > > Changes proposed:
> > > > > >
> > > > > > - Signals are used to define conditions that must be met to run
> an
> > > > > > operator. State change of the upstream tasks is one type of the
> > > > signals.
> > > > > > There may be other types of signals. The scheduler may take
> > different
> > > > > > actions when receiving different signals. To let the operators
> take
> > > > > signals
> > > > > > as their starting condition, we propose to introduce
> SignalOperator
> > > > which
> > > > > > is mentioned in the public interface section.
> > > > > > - A notification service is necessary to help receive and
> propagate
> > > the
> > > > > > signals from the operators and other sources to the scheduler.
> Upon
> > > > > > receiving a signal, the scheduler can take action according to
> the
> > > > > > predefined signal-based conditions on the operators. Therefore we
> > > > propose
> > > > > > to introduce a Signal Notification Service component to Airflow.
> > > > > >
> > > > > > Please see related documents and PRs for details:
> > > > > >
> > > > > > AIP:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
> > > > > > Issue: https://github.com/apache/airflow/issues/9321
> > > > > >
> > > > > > Please let me know if there are any aspects that you
> agree/disagree
> > > > with
> > > > > or
> > > > > > need more clarification (especially the SignalOperator and
> > > > > SignalService).
> > > > > > Any comments are welcome and I am looking forward to it!
> > > > > >
> > > > > > Thanks,
> > > > > > Nicholas
> > > > >
> > > >
> > >
> >
>

Reply via email to