More in line with your first option, we also created a system were we have upstream tasks that write files to S3 and want other downstream tasks to be able to find those upstream files. We use dag_id, task_id, and execution_date as a standard way to name file paths so downstream tasks can discover them.
On Wed, Jul 13, 2016 at 7:25 PM, Lance Norskog <lance.nors...@gmail.com> wrote: > Another way to do this is for T3 to do an S3 directory listing and find the > latest directories that need processing. When it finishes processing a > directory from t1, it deletes it. t1 should add a final file to a directory > that means "I'm finished, take this directory". > > This is a very fail-safe approach. It changes the model from a pipeline: > t1[time 1] supplies data to -> t3[time 1] > to: > t1[time 1], t1[time 2], ... t1[time n] all supply data to -> t3[time n] > > On Mon, Jul 11, 2016 at 5:43 PM, MSR M <msrmaill...@gmail.com> wrote: > > > Hi, > > > > I need some advice in solving a problem with local variables in DAG. > > > > I have a DAG < schedule intervel 30 mins >. It has 3 tasks. t1 runs a > > python program on remote EC2. t2 waits for S3 file availability at > > particular location. This S3 file created by t1. Once the S3 file is > > available, t3 runs and process the file on S3. > > > > I have date-time as part of my S3 file location. > > > > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M') > > > > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess" > > > > t1 runs more than 1 hour so second instance of DAG is already started > and > > it changes the variable dttm2 value so job1 task # t2 is trying to locate > > the file at different location. > > > > To overcome this I am planning to use parameter {{execution_date}} > instead > > of getting dttm2 value as shown above. > > > > In situations like these, is there any better approach to keep same value > > in a variable through out the particular run of DAG? > > > > Or use XCOM feature to push and pull the values across the tasks with > > different keys for each run? > > > > > > part of my dag is given below: > > > > # > > > > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M') > > > > > > NL = """ > > cd /home/ubuntu/Scripts/ ; python2 a11.py > {{params.get("dttm2")}} > > ; > > """ > > > > t1 = BashOperator( > > task_id='E_Ns_A', > > bash_command=NL, > > params={'dttm2':dttm2}, > > retries=3, > > dag=dag) > > > > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess" > > > > def detect_s3(name, dag=dag, upstream=t1): > > task = S3KeySensor( > > task_id = name, > > bucket_key=bucket_key2, > > s3_conn_id='s3conn', > > dag=dag, > > wildcard_match=True) > > task.set_upstream(upstream) > > return task > > > > # Spark Module to clasiify data > > > > bucket_key3 = "s3://aaaaa/bbbbb/" + dttm2 + "/" > > sparkcmd = """ > > cd /home/ubuntu/SC; /home/ubuntu/anaconda3/bin/python > > NbRunner.py; > > aws s3 cp /home/ubuntu/NC.txt {{params.get("bkey")}} --region > > us-west-1 > > """ > > > > t3 = BashOperator( > > task_id='CNs', > > bash_command=sparkcmd, > > params={"bkey":bucket_key3}, > > retries=1, > > dag=dag) > > > > t2 = detect_s3('t2') > > > > t3.set_upstream(t2) > > > > > > Thanks, > > MSR > > > > > > -- > Lance Norskog > lance.nors...@gmail.com > Redwood City, CA >