Chiming in here.

* I think the term callback is a bit confusing, it collides with a
different definition in the javascript world
* I like the idea of a status that can only be altered externally (REST /
CLI / sqla / ...) and that the scheduler simply disregards (and probably it
handles the timeout too). Could be simply `ON_HOLD` or more verbose
`WAITING_FOR_EXTERNAL_TRIGGER`

I have a complementary / overlapping proposal in the area of rationalizing
the number of worker slots. My idea was to add an argument to
`BaseSensorOperator` to define whether the scheduler or a worker should
eval the condition. `evaled_by_worker=True`. If set to `False`, the sensor
would be evaluated by the scheduler as it processes the dependencies,
potentially liberating many worker slots. This of course assumes a
lightweight sensor.

Max


On Wed, Feb 8, 2017 at 12:45 PM, Alex Van Boxel <a...@vanboxel.be> wrote:

> Hey,
>
> here is my feedback, because I've been thinking about events as well. I
> would call it it 'WAITING_FOR_EVENT'. Here are the use-cases I would use it
> for:
>
> Have a thread (or process) listen on the Google Audit Log. It contains a
> lot of changes on the Google Project (Google DataProc finished, File in
> Bucket added, ...) and translate it to Events that Airflow understands.
>
>
> *DataProc/BigQuery/Storage*
>
>    1. Start DagProc job through the API (long running) - takes a
>    parallelism slot
>    2. Get JobId
>    3. Register for event: *gc:dataproc:job_id_xxxx* with timeout 300s
>    4. execute stops and parallelism slot is freed
>
> The google audit log is constantly translating log entries to airflow
> events. Airflow event_listener looks for specific events registered (could
> be wildcards). As soon as the event match it call's the callback:
>
>    1. on_event is called with *gc:dataproc:job_id_xxxx* and JSON with event
>    payload
>    2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS*
>    or register for other event
>
> It could be a timeout
>
>    1. on_event is called with *airflow:timeout:dag_id* timeout event
>    2. handle timeout with
>       1. throw FAILURE
>       2. try to recover by calling DataProc API and re-register new/or same
>       event
>
> *Bolke's callback example (mapped to this)*
>
>
>    1. task x for z does some work
>    2. register for event *api:callback:20170101T000000*
>    3. execute stops and parallelism slot is freed
>
> The API translated http callbacks into events
>
>    1. on_event is called with *api:callback:20170101T000000* and JSON with
>    event payload
>    2. handle JSON payload depending on payload throw *FAILURE* or
> *SUCCESS* or
>    register for other event
>
> It could be a timeout
>
>    1. on_event is called with *airflow:timeout:dag_id* timeout event
>    2. handle timeout with FAILURE
>
>
> So, what do you think?
>
>
> On Wed, Feb 8, 2017 at 2:40 PM Jeremiah Lowin <jlo...@apache.org> wrote:
>
> I meant the API -- will check the wiki now. Thanks!
>
> On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bdbr...@gmail.com> wrote:
>
> > On this proposal? No, not yet. Just popped in my mind yesterday. API
> there
> > is a bit on the wiki.
> >
> > > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jlo...@apache.org> wrote:
> > >
> > > Makes a lot of sense. At the NY meetup there was considerable interest
> in
> > > using the API (and quite a few hacks around exposing the CLI!) -- is
> > there
> > > more complete documentation anywhere?
> > >
> > > Thanks Bolke
> > >
> > > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bdbr...@gmail.com>
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> Now that we have an API in place. I would like to propose a new state
> > for
> > >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have
> a
> > >> kind of polling mechanism (ie. Sensors) that wait for an action to
> > happen
> > >> and check if that action happened by regularly polling a particular
> > >> backend. This will always use a slot from one of the workers and could
> > >> starve an airflow cluster for resources. What if a callback to Airflow
> > >> could happen that task to change its status by calling a callback
> > mechanism
> > >> without taking up a worker slot. A timeout could (should) be
> associated
> > >> with the required callback so that the task can fail if required. So a
> > bit
> > >> more visual:
> > >>
> > >>
> > >> Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” ->
> API
> > >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> > status
> > >> to SUCCESS”
> > >>
> > >> DAG Z happily continues.
> > >>
> > >> Or
> > >>
> > >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> > time
> > >> passes -> scheduler sets task to FAILED.
> > >>
> > >>
> > >> Any thoughts?
> > >>
> > >> - Bolke
> >
> >
>
> --
>   _/
> _/ Alex Van Boxel
>

Reply via email to