+1 for Laura's suggestion.

We (Agari) also find ourselves in situations where dag runs end up
overlapping. We pass the execution_date via Jinja templates to BashOperator
to remote shell commands to Spark jobs to the final Spark output files on
S3. The execution date is what ties the DAG Run to the file S3 output as
Laura points out.

-s

On Thu, Jul 14, 2016 at 8:39 AM, Laura Lorenz <llor...@industrydive.com>
wrote:

> More in line with your first option, we also created a system were we have
> upstream tasks that write files to S3 and want other downstream tasks to be
> able to find those upstream files. We use dag_id, task_id, and
> execution_date as a standard way to name file paths so downstream tasks can
> discover them.
>
> On Wed, Jul 13, 2016 at 7:25 PM, Lance Norskog <lance.nors...@gmail.com>
> wrote:
>
> > Another way to do this is for T3 to do an S3 directory listing and find
> the
> > latest directories that need processing. When it finishes processing a
> > directory from t1, it deletes it. t1 should add a final file to a
> directory
> > that means "I'm finished, take this directory".
> >
> > This is a very fail-safe approach. It changes the model from a pipeline:
> >   t1[time 1] supplies data to -> t3[time 1]
> > to:
> >   t1[time 1], t1[time 2], ... t1[time n] all supply data to -> t3[time n]
> >
> > On Mon, Jul 11, 2016 at 5:43 PM, MSR M <msrmaill...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I need some advice in solving a problem with local variables in DAG.
> > >
> > > I have a DAG < schedule intervel 30 mins >. It has 3 tasks. t1 runs a
> > > python program on remote EC2. t2 waits for S3 file availability at
> > > particular location. This S3 file created by t1. Once the S3 file is
> > > available, t3 runs and process the file on S3.
> > >
> > > I have date-time as part of my S3 file location.
> > >
> > > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')
> > >
> > > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"
> > >
> > > t1 runs more than 1 hour so second instance of DAG  is already started
> > and
> > > it changes the variable dttm2 value so job1 task # t2 is trying to
> locate
> > > the file at different location.
> > >
> > > To overcome this I am planning to use parameter {{execution_date}}
> > instead
> > > of getting dttm2 value as shown above.
> > >
> > > In situations like these, is there any better approach to keep same
> value
> > > in a variable through out the particular run of DAG?
> > >
> > > Or use XCOM feature to push and pull the values across the tasks with
> > > different keys for each run?
> > >
> > >
> > > part of my dag is given below:
> > >
> > > #
> > >
> > > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')
> > >
> > >
> > > NL = """
> > >           cd /home/ubuntu/Scripts/ ; python2 a11.py
> > {{params.get("dttm2")}}
> > > ;
> > > """
> > >
> > > t1 = BashOperator(
> > >     task_id='E_Ns_A',
> > >     bash_command=NL,
> > >     params={'dttm2':dttm2},
> > >     retries=3,
> > >     dag=dag)
> > >
> > > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"
> > >
> > > def detect_s3(name, dag=dag, upstream=t1):
> > >   task = S3KeySensor(
> > >     task_id = name,
> > >     bucket_key=bucket_key2,
> > >     s3_conn_id='s3conn',
> > >     dag=dag,
> > >     wildcard_match=True)
> > >   task.set_upstream(upstream)
> > >   return task
> > >
> > > # Spark Module to clasiify data
> > >
> > > bucket_key3 = "s3://aaaaa/bbbbb/" + dttm2 + "/"
> > > sparkcmd = """
> > >            cd /home/ubuntu/SC; /home/ubuntu/anaconda3/bin/python
> > >  NbRunner.py;
> > >            aws s3 cp /home/ubuntu/NC.txt {{params.get("bkey")}}
> --region
> > > us-west-1
> > > """
> > >
> > > t3 = BashOperator(
> > >     task_id='CNs',
> > >     bash_command=sparkcmd,
> > >     params={"bkey":bucket_key3},
> > >     retries=1,
> > >     dag=dag)
> > >
> > > t2 = detect_s3('t2')
> > >
> > > t3.set_upstream(t2)
> > >
> > >
> > > Thanks,
> > > MSR
> > >
> >
> >
> >
> > --
> > Lance Norskog
> > lance.nors...@gmail.com
> > Redwood City, CA
> >
>

Reply via email to