A lot of our dags are ingesting data (usually daily or weekly) from suppliers, 
and they are universally late.

In the case I'm setting up now the delivery lag is about 30hours - data for 
2018-10-10 turned up at 2018-10-12 05:43.

I was going to just set this up with an S3KeySensor and a daily schedule, but 
I'm wondering if anyone has any other bright ideas for a better way of handling 
this sort of case:

    dag = DAG(
        DAG_ID
        default_args=args,
        start_date=args['start_date'],
        concurrency=1,
        schedule_interval='@daily',
        params={'country': cc}
    )

    with dag:
        task = S3KeySensor(
            task_id="await_files",
            bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{ 
execution_date.strftime('%Y/%m/%d') }}/SUCCESS",
            poke_interval=60 * 60 * 2,
            timeout=60 * 60 * 72,
        )

That S3 key sensor is _going_ to fail the first 18 times or so it runs which 
just seems silly.

One option could be to use `ds_add` or similar on the execution date, but I 
don't like breaking the (obvious) link between execution date and which files 
it picks up, so I've ruled out this option

I could use a Time(Delta)Sensor to just delay the start of the checking. I 
guess with the new change in master to make sensors yield their execution slots 
that's not a terrible plan.

Does anyone else have any other idea, including possible things we could add to 
Airflow itself.

-ash

Reply via email to