Re: Ingest daily data, but delivery is always delayed by two days

2018-10-12 Thread James Meickle
For something to add to Airflow itself: I would love a more flexible
mapping between data time and processing time. The default is "n-1" (day
over day, you're aiming to process yesterday's data) but people post other
use cases on this mailing list quite frequently.

On Fri, Oct 12, 2018 at 7:46 AM Faouz El Fassi  wrote:

> What about an exponential back off on the poke interval?
>
> On Fri, 12 Oct 2018, 13:01 Ash Berlin-Taylor,  wrote:
>
> > That would work for some of our other uses cases (and has been an idea in
> > our backlog for months) but not this case as we're reading from someone
> > else's bucket so can't set up notifications etc. :(
> >
> > -ash
> >
> > > On 12 Oct 2018, at 11:57, Bolke de Bruin  wrote:
> > >
> > > S3 Bucket notification that triggers a dag?
> > >
> > > Verstuurd vanaf mijn iPad
> > >
> > >> Op 12 okt. 2018 om 12:42 heeft Ash Berlin-Taylor  het
> > volgende geschreven:
> > >>
> > >> A lot of our dags are ingesting data (usually daily or weekly) from
> > suppliers, and they are universally late.
> > >>
> > >> In the case I'm setting up now the delivery lag is about 30hours -
> data
> > for 2018-10-10 turned up at 2018-10-12 05:43.
> > >>
> > >> I was going to just set this up with an S3KeySensor and a daily
> > schedule, but I'm wondering if anyone has any other bright ideas for a
> > better way of handling this sort of case:
> > >>
> > >>   dag = DAG(
> > >>   DAG_ID
> > >>   default_args=args,
> > >>   start_date=args['start_date'],
> > >>   concurrency=1,
> > >>   schedule_interval='@daily',
> > >>   params={'country': cc}
> > >>   )
> > >>
> > >>   with dag:
> > >>   task = S3KeySensor(
> > >>   task_id="await_files",
> > >>   bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{
> > execution_date.strftime('%Y/%m/%d') }}/SUCCESS",
> > >>   poke_interval=60 * 60 * 2,
> > >>   timeout=60 * 60 * 72,
> > >>   )
> > >>
> > >> That S3 key sensor is _going_ to fail the first 18 times or so it runs
> > which just seems silly.
> > >>
> > >> One option could be to use `ds_add` or similar on the execution date,
> > but I don't like breaking the (obvious) link between execution date and
> > which files it picks up, so I've ruled out this option
> > >>
> > >> I could use a Time(Delta)Sensor to just delay the start of the
> > checking. I guess with the new change in master to make sensors yield
> their
> > execution slots that's not a terrible plan.
> > >>
> > >> Does anyone else have any other idea, including possible things we
> > could add to Airflow itself.
> > >>
> > >> -ash
> > >>
> >
> >
>


Re: Ingest daily data, but delivery is always delayed by two days

2018-10-12 Thread Faouz El Fassi
What about an exponential back off on the poke interval?

On Fri, 12 Oct 2018, 13:01 Ash Berlin-Taylor,  wrote:

> That would work for some of our other uses cases (and has been an idea in
> our backlog for months) but not this case as we're reading from someone
> else's bucket so can't set up notifications etc. :(
>
> -ash
>
> > On 12 Oct 2018, at 11:57, Bolke de Bruin  wrote:
> >
> > S3 Bucket notification that triggers a dag?
> >
> > Verstuurd vanaf mijn iPad
> >
> >> Op 12 okt. 2018 om 12:42 heeft Ash Berlin-Taylor  het
> volgende geschreven:
> >>
> >> A lot of our dags are ingesting data (usually daily or weekly) from
> suppliers, and they are universally late.
> >>
> >> In the case I'm setting up now the delivery lag is about 30hours - data
> for 2018-10-10 turned up at 2018-10-12 05:43.
> >>
> >> I was going to just set this up with an S3KeySensor and a daily
> schedule, but I'm wondering if anyone has any other bright ideas for a
> better way of handling this sort of case:
> >>
> >>   dag = DAG(
> >>   DAG_ID
> >>   default_args=args,
> >>   start_date=args['start_date'],
> >>   concurrency=1,
> >>   schedule_interval='@daily',
> >>   params={'country': cc}
> >>   )
> >>
> >>   with dag:
> >>   task = S3KeySensor(
> >>   task_id="await_files",
> >>   bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{
> execution_date.strftime('%Y/%m/%d') }}/SUCCESS",
> >>   poke_interval=60 * 60 * 2,
> >>   timeout=60 * 60 * 72,
> >>   )
> >>
> >> That S3 key sensor is _going_ to fail the first 18 times or so it runs
> which just seems silly.
> >>
> >> One option could be to use `ds_add` or similar on the execution date,
> but I don't like breaking the (obvious) link between execution date and
> which files it picks up, so I've ruled out this option
> >>
> >> I could use a Time(Delta)Sensor to just delay the start of the
> checking. I guess with the new change in master to make sensors yield their
> execution slots that's not a terrible plan.
> >>
> >> Does anyone else have any other idea, including possible things we
> could add to Airflow itself.
> >>
> >> -ash
> >>
>
>


Re: Ingest daily data, but delivery is always delayed by two days

2018-10-12 Thread Ash Berlin-Taylor
That would work for some of our other uses cases (and has been an idea in our 
backlog for months) but not this case as we're reading from someone else's 
bucket so can't set up notifications etc. :(

-ash

> On 12 Oct 2018, at 11:57, Bolke de Bruin  wrote:
> 
> S3 Bucket notification that triggers a dag?
> 
> Verstuurd vanaf mijn iPad
> 
>> Op 12 okt. 2018 om 12:42 heeft Ash Berlin-Taylor  het 
>> volgende geschreven:
>> 
>> A lot of our dags are ingesting data (usually daily or weekly) from 
>> suppliers, and they are universally late.
>> 
>> In the case I'm setting up now the delivery lag is about 30hours - data for 
>> 2018-10-10 turned up at 2018-10-12 05:43.
>> 
>> I was going to just set this up with an S3KeySensor and a daily schedule, 
>> but I'm wondering if anyone has any other bright ideas for a better way of 
>> handling this sort of case:
>> 
>>   dag = DAG(
>>   DAG_ID
>>   default_args=args,
>>   start_date=args['start_date'],
>>   concurrency=1,
>>   schedule_interval='@daily',
>>   params={'country': cc}
>>   )
>> 
>>   with dag:
>>   task = S3KeySensor(
>>   task_id="await_files",
>>   bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{ 
>> execution_date.strftime('%Y/%m/%d') }}/SUCCESS",
>>   poke_interval=60 * 60 * 2,
>>   timeout=60 * 60 * 72,
>>   )
>> 
>> That S3 key sensor is _going_ to fail the first 18 times or so it runs which 
>> just seems silly.
>> 
>> One option could be to use `ds_add` or similar on the execution date, but I 
>> don't like breaking the (obvious) link between execution date and which 
>> files it picks up, so I've ruled out this option
>> 
>> I could use a Time(Delta)Sensor to just delay the start of the checking. I 
>> guess with the new change in master to make sensors yield their execution 
>> slots that's not a terrible plan.
>> 
>> Does anyone else have any other idea, including possible things we could add 
>> to Airflow itself.
>> 
>> -ash
>>