Hi,
How can I create dependencies between emr steps ? Do I need to create a
step watcher between each one like below (option 1) or I don't need the
step_watcher and they can be dependent directly (option 2) ? meaning
something like this:

step1 = EmrAddStepsOperator(
task_id="step1",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP1,
dag=dag,
)

step2 = EmrAddStepsOperator(
task_id="step2",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP2,
dag=dag,
)

step1_watcher = EmrStepSensor(
task_id="step_1watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

step2_watcher = EmrStepSensor(
task_id="step_2watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
>> step2 >> step2_warcher >> terminate

option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
step2_warcher >> terminate

Reply via email to