For something to add to Airflow itself: I would love a more flexible mapping between data time and processing time. The default is "n-1" (day over day, you're aiming to process yesterday's data) but people post other use cases on this mailing list quite frequently.
On Fri, Oct 12, 2018 at 7:46 AM Faouz El Fassi <fa...@drivy.com> wrote: > What about an exponential back off on the poke interval? > > On Fri, 12 Oct 2018, 13:01 Ash Berlin-Taylor, <a...@apache.org> wrote: > > > That would work for some of our other uses cases (and has been an idea in > > our backlog for months) but not this case as we're reading from someone > > else's bucket so can't set up notifications etc. :( > > > > -ash > > > > > On 12 Oct 2018, at 11:57, Bolke de Bruin <bdbr...@gmail.com> wrote: > > > > > > S3 Bucket notification that triggers a dag? > > > > > > Verstuurd vanaf mijn iPad > > > > > >> Op 12 okt. 2018 om 12:42 heeft Ash Berlin-Taylor <a...@apache.org> het > > volgende geschreven: > > >> > > >> A lot of our dags are ingesting data (usually daily or weekly) from > > suppliers, and they are universally late. > > >> > > >> In the case I'm setting up now the delivery lag is about 30hours - > data > > for 2018-10-10 turned up at 2018-10-12 05:43. > > >> > > >> I was going to just set this up with an S3KeySensor and a daily > > schedule, but I'm wondering if anyone has any other bright ideas for a > > better way of handling this sort of case: > > >> > > >> dag = DAG( > > >> DAG_ID > > >> default_args=args, > > >> start_date=args['start_date'], > > >> concurrency=1, > > >> schedule_interval='@daily', > > >> params={'country': cc} > > >> ) > > >> > > >> with dag: > > >> task = S3KeySensor( > > >> task_id="await_files", > > >> bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{ > > execution_date.strftime('%Y/%m/%d') }}/SUCCESS", > > >> poke_interval=60 * 60 * 2, > > >> timeout=60 * 60 * 72, > > >> ) > > >> > > >> That S3 key sensor is _going_ to fail the first 18 times or so it runs > > which just seems silly. > > >> > > >> One option could be to use `ds_add` or similar on the execution date, > > but I don't like breaking the (obvious) link between execution date and > > which files it picks up, so I've ruled out this option > > >> > > >> I could use a Time(Delta)Sensor to just delay the start of the > > checking. I guess with the new change in master to make sensors yield > their > > execution slots that's not a terrible plan. > > >> > > >> Does anyone else have any other idea, including possible things we > > could add to Airflow itself. > > >> > > >> -ash > > >> > > > > >