More in line with your first option, we also created a system were we have
upstream tasks that write files to S3 and want other downstream tasks to be
able to find those upstream files. We use dag_id, task_id, and
execution_date as a standard way to name file paths so downstream tasks can
discover them.

On Wed, Jul 13, 2016 at 7:25 PM, Lance Norskog <lance.nors...@gmail.com>
wrote:

> Another way to do this is for T3 to do an S3 directory listing and find the
> latest directories that need processing. When it finishes processing a
> directory from t1, it deletes it. t1 should add a final file to a directory
> that means "I'm finished, take this directory".
>
> This is a very fail-safe approach. It changes the model from a pipeline:
>   t1[time 1] supplies data to -> t3[time 1]
> to:
>   t1[time 1], t1[time 2], ... t1[time n] all supply data to -> t3[time n]
>
> On Mon, Jul 11, 2016 at 5:43 PM, MSR M <msrmaill...@gmail.com> wrote:
>
> > Hi,
> >
> > I need some advice in solving a problem with local variables in DAG.
> >
> > I have a DAG < schedule intervel 30 mins >. It has 3 tasks. t1 runs a
> > python program on remote EC2. t2 waits for S3 file availability at
> > particular location. This S3 file created by t1. Once the S3 file is
> > available, t3 runs and process the file on S3.
> >
> > I have date-time as part of my S3 file location.
> >
> > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')
> >
> > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"
> >
> > t1 runs more than 1 hour so second instance of DAG  is already started
> and
> > it changes the variable dttm2 value so job1 task # t2 is trying to locate
> > the file at different location.
> >
> > To overcome this I am planning to use parameter {{execution_date}}
> instead
> > of getting dttm2 value as shown above.
> >
> > In situations like these, is there any better approach to keep same value
> > in a variable through out the particular run of DAG?
> >
> > Or use XCOM feature to push and pull the values across the tasks with
> > different keys for each run?
> >
> >
> > part of my dag is given below:
> >
> > #
> >
> > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M')
> >
> >
> > NL = """
> >           cd /home/ubuntu/Scripts/ ; python2 a11.py
> {{params.get("dttm2")}}
> > ;
> > """
> >
> > t1 = BashOperator(
> >     task_id='E_Ns_A',
> >     bash_command=NL,
> >     params={'dttm2':dttm2},
> >     retries=3,
> >     dag=dag)
> >
> > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess"
> >
> > def detect_s3(name, dag=dag, upstream=t1):
> >   task = S3KeySensor(
> >     task_id = name,
> >     bucket_key=bucket_key2,
> >     s3_conn_id='s3conn',
> >     dag=dag,
> >     wildcard_match=True)
> >   task.set_upstream(upstream)
> >   return task
> >
> > # Spark Module to clasiify data
> >
> > bucket_key3 = "s3://aaaaa/bbbbb/" + dttm2 + "/"
> > sparkcmd = """
> >            cd /home/ubuntu/SC; /home/ubuntu/anaconda3/bin/python
> >  NbRunner.py;
> >            aws s3 cp /home/ubuntu/NC.txt {{params.get("bkey")}} --region
> > us-west-1
> > """
> >
> > t3 = BashOperator(
> >     task_id='CNs',
> >     bash_command=sparkcmd,
> >     params={"bkey":bucket_key3},
> >     retries=1,
> >     dag=dag)
> >
> > t2 = detect_s3('t2')
> >
> > t3.set_upstream(t2)
> >
> >
> > Thanks,
> > MSR
> >
>
>
>
> --
> Lance Norskog
> lance.nors...@gmail.com
> Redwood City, CA
>

Reply via email to