Chiming in here. * I think the term callback is a bit confusing, it collides with a different definition in the javascript world * I like the idea of a status that can only be altered externally (REST / CLI / sqla / ...) and that the scheduler simply disregards (and probably it handles the timeout too). Could be simply `ON_HOLD` or more verbose `WAITING_FOR_EXTERNAL_TRIGGER`
I have a complementary / overlapping proposal in the area of rationalizing the number of worker slots. My idea was to add an argument to `BaseSensorOperator` to define whether the scheduler or a worker should eval the condition. `evaled_by_worker=True`. If set to `False`, the sensor would be evaluated by the scheduler as it processes the dependencies, potentially liberating many worker slots. This of course assumes a lightweight sensor. Max On Wed, Feb 8, 2017 at 12:45 PM, Alex Van Boxel <a...@vanboxel.be> wrote: > Hey, > > here is my feedback, because I've been thinking about events as well. I > would call it it 'WAITING_FOR_EVENT'. Here are the use-cases I would use it > for: > > Have a thread (or process) listen on the Google Audit Log. It contains a > lot of changes on the Google Project (Google DataProc finished, File in > Bucket added, ...) and translate it to Events that Airflow understands. > > > *DataProc/BigQuery/Storage* > > 1. Start DagProc job through the API (long running) - takes a > parallelism slot > 2. Get JobId > 3. Register for event: *gc:dataproc:job_id_xxxx* with timeout 300s > 4. execute stops and parallelism slot is freed > > The google audit log is constantly translating log entries to airflow > events. Airflow event_listener looks for specific events registered (could > be wildcards). As soon as the event match it call's the callback: > > 1. on_event is called with *gc:dataproc:job_id_xxxx* and JSON with event > payload > 2. handle JSON payload depending on payload throw *FAILURE* or *SUCCESS* > or register for other event > > It could be a timeout > > 1. on_event is called with *airflow:timeout:dag_id* timeout event > 2. handle timeout with > 1. throw FAILURE > 2. try to recover by calling DataProc API and re-register new/or same > event > > *Bolke's callback example (mapped to this)* > > > 1. task x for z does some work > 2. register for event *api:callback:20170101T000000* > 3. execute stops and parallelism slot is freed > > The API translated http callbacks into events > > 1. on_event is called with *api:callback:20170101T000000* and JSON with > event payload > 2. handle JSON payload depending on payload throw *FAILURE* or > *SUCCESS* or > register for other event > > It could be a timeout > > 1. on_event is called with *airflow:timeout:dag_id* timeout event > 2. handle timeout with FAILURE > > > So, what do you think? > > > On Wed, Feb 8, 2017 at 2:40 PM Jeremiah Lowin <jlo...@apache.org> wrote: > > I meant the API -- will check the wiki now. Thanks! > > On Wed, Feb 8, 2017 at 8:33 AM Bolke de Bruin <bdbr...@gmail.com> wrote: > > > On this proposal? No, not yet. Just popped in my mind yesterday. API > there > > is a bit on the wiki. > > > > > On 8 Feb 2017, at 14:31, Jeremiah Lowin <jlo...@apache.org> wrote: > > > > > > Makes a lot of sense. At the NY meetup there was considerable interest > in > > > using the API (and quite a few hacks around exposing the CLI!) -- is > > there > > > more complete documentation anywhere? > > > > > > Thanks Bolke > > > > > > On Wed, Feb 8, 2017 at 1:36 AM Bolke de Bruin <bdbr...@gmail.com> > wrote: > > > > > >> Hi All, > > >> > > >> Now that we have an API in place. I would like to propose a new state > > for > > >> tasks named “WAITING_ON_CALLBACK”. Currently, we have tasks that have > a > > >> kind of polling mechanism (ie. Sensors) that wait for an action to > > happen > > >> and check if that action happened by regularly polling a particular > > >> backend. This will always use a slot from one of the workers and could > > >> starve an airflow cluster for resources. What if a callback to Airflow > > >> could happen that task to change its status by calling a callback > > mechanism > > >> without taking up a worker slot. A timeout could (should) be > associated > > >> with the required callback so that the task can fail if required. So a > > bit > > >> more visual: > > >> > > >> > > >> Task X from DAG Z does some work and sets “WAITING_ON_CALLBACK” -> > API > > >> post to /dags/Z/dag_runs/20170101T00:00:00/tasks/X with payload “set > > status > > >> to SUCCESS” > > >> > > >> DAG Z happily continues. > > >> > > >> Or > > >> > > >> Task X from DAG Z sets “WAITING_ON_CALLBACK” with timeout of 300s -> > > time > > >> passes -> scheduler sets task to FAILED. > > >> > > >> > > >> Any thoughts? > > >> > > >> - Bolke > > > > > > -- > _/ > _/ Alex Van Boxel >