Hey,

here is my feedback, because I've been thinking about events as well. I
would call it it 'WAITING_FOR_EVENT'. Here are the use-cases I would use it
for:

Have a thread (or process) listen on the Google Audit Log. It contains a
lot of changes on the Google Project (Google DataProc finished, File in
Bucket added, ...) and translate it to Events that Airflow understands.


*DataProc/BigQuery/Storage*

   1. Start DagProc job through the API (long running) - takes a
   parallelism slot
   2. Get JobId
   3. Register for event: *gc:dataproc:job_id_xxxx* with timeout 300s
   4. execute stops and parallelism slot is freed

The google audit log is constantly translating log entries to airflow
events. Airflow event_listener looks for specific events registered (could
be wildcards). As soon as the event match it call's the callback:

   1. on_event is called with *gc:dataproc:job_id_xxxx* and JSON with event
   payload
   2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS*
   or register for other event

It could be a timeout

   1. on_event is called with *airflow:timeout:dag_id* timeout event
   2. handle timeout with
      1. throw FAILURE
      2. try to recover by calling DataProc API and re-register new/or same
      event

*Bolke's callback example (mapped to this)*


   1. task x for z does some work
   2. register for event *api:callback:20170101T000000*
   3. execute stops and parallelism slot is freed

The API translated http callbacks into events

   1. on_event is called with *api:callback:20170101T000000* and JSON with
   event payload
   2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS* or
   register for other event

It could be a timeout

   1. on_event is called with *airflow:timeout:dag_id* timeout event
   2. handle timeout with FAILURE


So, what do you think?


On Wed, Feb 8, 2017 at 2:40 PM Jeremiah Lowin <[email protected]> wrote:

I meant the API -- will check the wiki now. Thanks!

On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <[email protected]> wrote:

> On this proposal? No, not yet. Just popped in my mind yesterday. API there
> is a bit on the wiki.
>
> > On 8 Feb 2017, at 14:31, Jeremiah Lowin <[email protected]> wrote:
> >
> > Makes a lot of sense. At the NY meetup there was considerable interest
in
> > using the API (and quite a few hacks around exposing the CLI!) -- is
> there
> > more complete documentation anywhere?
> >
> > Thanks Bolke
> >
> > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <[email protected]> wrote:
> >
> >> Hi All,
> >>
> >> Now that we have an API in place. I would like to propose a new state
> for
> >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have a
> >> kind of polling mechanism (ie. Sensors) that wait for an action to
> happen
> >> and check if that action happened by regularly polling a particular
> >> backend. This will always use a slot from one of the workers and could
> >> starve an airflow cluster for resources. What if a callback to Airflow
> >> could happen that task to change its status by calling a callback
> mechanism
> >> without taking up a worker slot. A timeout could (should) be associated
> >> with the required callback so that the task can fail if required. So a
> bit
> >> more visual:
> >>
> >>
> >> Task X from DAG Z  does some work and sets “WAITING_ON_CALLBACK” -> API
> >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set
> status
> >> to SUCCESS”
> >>
> >> DAG Z happily continues.
> >>
> >> Or
> >>
> >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s ->
> time
> >> passes -> scheduler sets task to FAILED.
> >>
> >>
> >> Any thoughts?
> >>
> >> - Bolke
>
>

-- 
  _/
_/ Alex Van Boxel

Reply via email to