How is the UI going to represent this?

How is do you navigate between them?

There is already the concept of try_number -- can that be used instead
of making task_id to task_ids?

On Jun 17 2020, at 11:24 am, 蒋晓峰 <thanosxnicho...@gmail.com> wrote:

> Hi Gerard,
>  
> Regarding the question mentioned above, it's a good point. The Operator
> currently contains a task_id attribute which is the same as operator id.
> Therefore, when a task instance runs multiple times, the task_id would be
> the same. So we do need to change something more than what is in the
> current proposal.
>  
> To solve the problem, we need to change task_id to task_ids attribute for
> multiple executions. In signal-based scheduling, multiple executions of
> operators need to use task_ids attribute to maintain the task instance list
> corresponding to the same DagRun. Once the downstream operator
> receives the
> signal from upstream, the scheduling always creates a new task
> instance. At
> this time, the Operator should append the new task instance id to
> the task_ids.
>  
> Regards,
> Nicholas Jiang
>  
> On Wed, Jun 17, 2020 at 2:55 PM Kevin Yang <yrql...@gmail.com> wrote:
>  
>> I'm in general supportive of this idea of supporting streaming jobs.
>> We in
>> Airbnb have historically ran stream jobs for years on Airflow, with some
>> hacks of course. Yes the stream jobs might not be idempotent or so to fit
>> in the Airflow paradigm. But I personally would love to see Airflow be
>> expanded to support more use cases and be the go to workflow orchestration
>> solution for companies.
>>  
>> With that said, I would prefer we carefully build a paved path for stream
>> job scheduling in Airflow instead of a solution for one particular use
>> case. Besides different scheduling model requirements, stream jobs
>> may need
>> more, e.g. much shorter scheduling latency and singleton guarantee. AIP-35
>> aims to solve the issue that downstream may need to be kicked off
>> even the
>> upstream job is still running. This is indeed a solid problem but my first
>> impression is that adding an entire signal notification system with a
>> metadata DB and touching the core scheduling logic for it might be a bit
>> overkill. Maybe it would be enough if the upstream still sends out a
>> signal, and we build a hacky operator that runs together with the upstream
>> task but performs no op until its listener got a start signal from the
>> upstream task.
>>  
>> Just my two cents with hope to get more ideas on this topic.
>>  
>>  
>> Cheers,
>> Kevin Y
>>  
>> On Tue, Jun 16, 2020 at 11:06 PM 蒋晓峰 <thanosxnicho...@gmail.com> wrote:
>>  
>> > Hi barath,
>> >
>> >    Could you please provide the permalink of your previous discussion
>> about
>> > trigger based dag runs?
>> >
>> >    In my opinion, compared with trigger based dag runs, signal based
>> > scheduling provides finer-grained triggers to determine whether to run
>> the
>> > task instances on the Operator or not. External systems outside of
>> Airflow
>> > could use SignalService to send signals to the scheduler to
>> indicate the
>> > state change of the operator's dependencies. It's indeed useful for
>> > external systems outside of Airflow.
>> >
>> >  Regards,
>> >  Nicholas Jiang
>> >
>> > On Wed, Jun 17, 2020 at 2:41 AM bharath palaksha <bharath...@gmail.com>
>> > wrote:
>> >
>> > > I had a started a similar discussion earlier.
>> > > Trigger based dag runs, a sensor instead of a cron expression which
>> tells
>> > > whether to trigger the dag run or not. This is similar to that.
>> > > It is very useful when you have external systems outside of
>> Airflow and
>> > > which can't be programmed to use REST API
>> > >
>> > > On Tue, Jun 16, 2020, 11:08 PM Chris Palmer <ch...@crpalmer.com>
>> wrote:
>> > >
>> > > > Nicholas,
>> > > >
>> > > > Are you saying that you actually have tasks in Airflow that are
>> > intended
>> > > to
>> > > > run indefinitely?
>> > > >
>> > > > That in of itself seems to be a huge fundamental departure from many
>> of
>> > > the
>> > > > assumptions built into Airflow.
>> > > >
>> > > > Chris
>> > > >
>> > > > On Tue, Jun 16, 2020 at 12:00 PM Gerard Casas Saez
>> > > > <gcasass...@twitter.com.invalid> wrote:
>> > > >
>> > > > > That looks interesting. My main worry is how to handle multiple
>> > > > executions
>> > > > > of Signal based operators. If I follow your definition correctly,
>> the
>> > > > idea
>> > > > > is to run multiple evaluations of the online trained model
>> (on a
>> > > > > permanently running DAGRun). So what happens when you have
>> triggered
>> > > the
>> > > > > downstream operators using a signal once? Do you clear state
>> of the
>> > > > > operators once a new signal is generated? Do you generate a new
>> > DAGRun?
>> > > > >
>> > > > > How do you evaluate the model several times while keeping a history
>> > of
>> > > > > operator execution?
>> > > > >
>> > > > > Related to this, TFX has a similar proposal for ASYNC DAGs, which
>> > > > > basically describe something similar to what you are
>> proposing in
>> > this
>> > > > AIP:
>> > > > > https://github.com/tensorflow/community/pull/253 (interesting read
>> > as
>> > > > > it’s also related to the ML field).
>> > > > >
>> > > > > My main concern would be that you may need to change a few more
>> > things
>> > > to
>> > > > > support signal based triggers, as the relationship between upstream
>> > > tasks
>> > > > > to downstream tasks is no longer 1:1 but 1:many.
>> > > > >
>> > > > > Gerard Casas Saez
>> > > > > Twitter | Cortex | @casassaez
>> > > > > On Jun 16, 2020, 7:00 AM -0600, 蒋晓峰 <thanosxnicho...@gmail.com>,
>> > > wrote:
>> > > > > > Hello everyone,
>> > > > > >
>> > > > > > Sending a message to everyone and collecting feedback on the
>> AIP-35
>> > > on
>> > > > > > adding signal-based scheduling. This was previously briefly
>> > mentioned
>> > > > in
>> > > > > > the discussion of development slack channel. The key motivation
>> of
>> > > this
>> > > > > > proposal is to support a mixture of batch and stream jobs
>> in the
>> > same
>> > > > > > workflow.
>> > > > > >
>> > > > > > In practice, there are many business logics that need
>> collaboration
>> > > > > between
>> > > > > > streaming and batch jobs. For example, in machine learning, an
>> > online
>> > > > > > learning job is a streaming job running forever. It emits a model
>> > > > > > periodically and a batch evaluate job should run to
>> validate each
>> > > > model.
>> > > > > So
>> > > > > > this is a streaming-triggered-batch case. If we continue
>> from the
>> > > above
>> > > > > > example, once the model passes the validation, the model
>> needs to
>> > be
>> > > > > > deployed to a serving job. That serving job could be a streaming
>> > job
>> > > > that
>> > > > > > keeps polling records from Kafka and uses a model to do
>> prediction.
>> > > And
>> > > > > the
>> > > > > > availability of a new model requires that streaming prediction
>> job
>> > > > either
>> > > > > > to restart, or to reload the new model on the fly. In either
>> case,
>> > it
>> > > > is
>> > > > > a
>> > > > > > batch-to-streaming job triggering.
>> > > > > >
>> > > > > > At this point above, people basically have to invent their own
>> way
>> > to
>> > > > > deal
>> > > > > > with such workflows consisting of both batch and streaming jobs.
>> I
>> > > > think
>> > > > > > having a system that can help smoothly work with a mixture of
>> > > streaming
>> > > > > and
>> > > > > > batch jobs is valuable here.
>> > > > > >
>> > > > > > This AIP-35 focuses on signal-based scheduling to let the
>> operators
>> > > > send
>> > > > > > signals to the scheduler to trigger a scheduling action,
>> such as
>> > > > starting
>> > > > > > jobs, stopping jobs and restarting jobs. With the change of
>> > > > > > scheduling mechanism, a streaming job can send signals to the
>> > > scheduler
>> > > > > to
>> > > > > > indicate the state change of the dependencies.
>> > > > > >
>> > > > > > Signal-based scheduling allows the scheduler to know the change
>> of
>> > > the
>> > > > > > dependency state immediately without periodically querying the
>> > > metadata
>> > > > > > database. This also allows potential support for richer
>> scheduling
>> > > > > > semantics such as periodic execution and manual trigger at per
>> > > operator
>> > > > > > granularity.
>> > > > > >
>> > > > > > Changes proposed:
>> > > > > >
>> > > > > > - Signals are used to define conditions that must be met to run
>> an
>> > > > > > operator. State change of the upstream tasks is one type of the
>> > > > signals.
>> > > > > > There may be other types of signals. The scheduler may take
>> > different
>> > > > > > actions when receiving different signals. To let the operators
>> take
>> > > > > signals
>> > > > > > as their starting condition, we propose to introduce
>> SignalOperator
>> > > > which
>> > > > > > is mentioned in the public interface section.
>> > > > > > - A notification service is necessary to help receive and
>> propagate
>> > > the
>> > > > > > signals from the operators and other sources to the scheduler.
>> Upon
>> > > > > > receiving a signal, the scheduler can take action according to
>> the
>> > > > > > predefined signal-based conditions on the operators.
>> Therefore we
>> > > > propose
>> > > > > > to introduce a Signal Notification Service component to Airflow.
>> > > > > >
>> > > > > > Please see related documents and PRs for details:
>> > > > > >
>> > > > > > AIP:
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-35+Add+Signal+Based+Scheduling+To+Airflow
>> > > > > > Issue: https://github.com/apache/airflow/issues/9321
>> > > > > >
>> > > > > > Please let me know if there are any aspects that you
>> agree/disagree
>> > > > with
>> > > > > or
>> > > > > > need more clarification (especially the SignalOperator and
>> > > > > SignalService).
>> > > > > > Any comments are welcome and I am looking forward to it!
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Nicholas
>> > > > >
>> > > >
>> > >
>> >
>>  
>

Reply via email to