I have a simple pipeline that consists of of two tasks. The second task
fetches an FTP file and stores on Google Cloud. The first task is an
external dag that waits for the file to be ready. I am wondering if I can
pass the execution date to the external dag. Right now, my  code uses the
dag.start_time, which won't work.

Here is my code:

default_args = {
            'owner': 'Henry',
            'end_date': datetime.datetime(2019, 3, 20),
            'retries': 1,
            'retry_delay': datetime.timedelta(minutes=1),
            }

def load_criteo(**kwargs):
    date = kwargs['execution_date']
    connection = BaseHook.get_connection("ftp_criteo")
    password = connection.password
    login =  connection.login
    criteo_get.load_criteo_to_gcp_storage(date, user_name = login, password
= password)


with DAG('criteo_to_storage_v1',
                default_args=default_args,
                schedule_interval = '@daily',
                start_date = datetime.datetime(2019, 3, 20),
                ) as dag:
    task_load_criteo = PythonOperator(
            task_id='load_criteo',
        provide_context=True,
        python_callable=load_criteo,
        )


# need to change this so it pulls in the execution date,
# not the start_date
external_dag = FTPSensor(
        path = '/BBY Daily MMO Report {y}.{m}.{d}.csv'.format(
            y = dag.start_date.year,
            m = dag.start_date.month,
            d = dag.start_date.day
            ),
        ftp_conn_id='ftp_criteo',
        task_id = 'criteo_file_is_ready',
        dag = dag,
        )

external_dag.set_downstream([task_load_criteo])

-- 
Henry Tremblay
Data Engineer, Best Buy

Reply via email to