Hi,
I only have one add_step task but a step_sensor for each step added.
e.g.
start_daily_pipeline = DummyOperator(
task_id="start_daily_pipeline",
dag=dag
)
cluster_creator = EmrCreateJobFlowOperator(
task_id="create_job_flow",
aws_conn_id="aws_role_default",
emr_conn_id="emr_default",
job_flow_overrides=JOB_FLOW_OVERRIDES,
dag=dag,
)
step_adder = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
key='return_value') }}",
aws_conn_id="aws_role_default",
steps=SPARK_STEPS,
dag=dag,
)
step1_checker = EmrStepSensor(
task_id="watch_step_1",
job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
key='return_value')[0] }}",
aws_conn_id="aws_role_default",
dag=dag,
)
step2_checker = EmrStepSensor(
task_id="watch_step_2",
job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
key='return_value')[1] }}",
aws_conn_id="aws_role_default",
dag=dag,
)
job_flow_checker = EmrJobFlowSensor(
task_id="watch_job_flow",
job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_role_default",
dag=dag,
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id="remove_cluster",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
key='return_value') }}",
aws_conn_id="aws_role_default",
dag=dag,
)
start_daily_pipeline >> cluster_creator >> step_adder
step_adder >> [step1_checker, step2_checker] >> job_flow_checker >>
cluster_remover
From: Avi Levi <[email protected]>
Sent: 03 June 2021 08:21
To: [email protected]
Subject: Create dependencies between emr steps
Hi,
How can I create dependencies between emr steps ? Do I need to create a step
watcher between each one like below (option 1) or I don't need the step_watcher
and they can be dependent directly (option 2) ? meaning something like this:
step1 = EmrAddStepsOperator(
task_id="step1",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP1,
dag=dag,
)
step2 = EmrAddStepsOperator(
task_id="step2",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP2,
dag=dag,
)
step1_watcher = EmrStepSensor(
task_id="step_1watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)
step2_watcher = EmrStepSensor(
task_id="step_2watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)
option 1: start_pipeline >> create_emr_cluster >> step1 >> step1_watcher >>
step2 >> step2_warcher >> terminate
option 2: start_pipeline >> create_emr_cluster >> step1 >> step2 >>
step2_warcher >> terminate