I have a simple pipeline that consists of of two tasks. The second task fetches an FTP file and stores on Google Cloud. The first task is an external dag that waits for the file to be ready. I am wondering if I can pass the execution date to the external dag. Right now, my code uses the dag.start_time, which won't work.
Here is my code: default_args = { 'owner': 'Henry', 'end_date': datetime.datetime(2019, 3, 20), 'retries': 1, 'retry_delay': datetime.timedelta(minutes=1), } def load_criteo(**kwargs): date = kwargs['execution_date'] connection = BaseHook.get_connection("ftp_criteo") password = connection.password login = connection.login criteo_get.load_criteo_to_gcp_storage(date, user_name = login, password = password) with DAG('criteo_to_storage_v1', default_args=default_args, schedule_interval = '@daily', start_date = datetime.datetime(2019, 3, 20), ) as dag: task_load_criteo = PythonOperator( task_id='load_criteo', provide_context=True, python_callable=load_criteo, ) # need to change this so it pulls in the execution date, # not the start_date external_dag = FTPSensor( path = '/BBY Daily MMO Report {y}.{m}.{d}.csv'.format( y = dag.start_date.year, m = dag.start_date.month, d = dag.start_date.day ), ftp_conn_id='ftp_criteo', task_id = 'criteo_file_is_ready', dag = dag, ) external_dag.set_downstream([task_load_criteo]) -- Henry Tremblay Data Engineer, Best Buy