This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 687b29baa28893e6156fa6fd7b89e02844ebfa0c Author: yuqian90 <[email protected]> AuthorDate: Sat Aug 15 06:39:57 2020 +0800 Fix clear future recursive when ExternalTaskMarker is used (#9515) (cherry picked from commit 4454224b682e07a641f1a8878197170c167de03c) --- airflow/models/dag.py | 2 +- tests/sensors/test_external_task_sensor.py | 28 ++++++++++++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 94c6d2e..de610e2 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1055,7 +1055,7 @@ class DAG(BaseDag, LoggingMixin): instances = tis.all() for ti in instances: if ti.operator == ExternalTaskMarker.__name__: - ti.task = self.get_task(ti.task_id) + ti.task = copy.copy(self.get_task(ti.task_id)) if recursion_depth == 0: # Maximum recursion depth allowed is the recursion_depth of the first diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 0e5e960..e2a58ec 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -433,12 +433,12 @@ def assert_ti_state_equal(task_instance, state): assert task_instance.state == state -def clear_tasks(dag_bag, dag, task): +def clear_tasks(dag_bag, dag, task, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE): """ Clear the task and its downstream tasks recursively for the dag in the given dagbag. """ subdag = dag.sub_dag(task_regex="^{}$".format(task.task_id), include_downstream=True) - subdag.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, dag_bag=dag_bag) + subdag.clear(start_date=start_date, end_date=end_date, dag_bag=dag_bag) # pylint: disable=redefined-outer-name @@ -456,6 +456,30 @@ def test_external_task_marker_transitive(dag_bag_ext): assert_ti_state_equal(ti_b_3, State.NONE) +def test_external_task_marker_future(dag_bag_ext): + """ + Test clearing tasks with no end_date. This is the case when users clear tasks with + Future, Downstream and Recursive selected. + """ + date_0 = DEFAULT_DATE + date_1 = DEFAULT_DATE + timedelta(days=1) + + tis_date_0 = run_tasks(dag_bag_ext, execution_date=date_0) + tis_date_1 = run_tasks(dag_bag_ext, execution_date=date_1) + + dag_0 = dag_bag_ext.get_dag("dag_0") + task_a_0 = dag_0.get_task("task_a_0") + # This should clear all tasks on dag_0 to dag_3 on both date_0 and date_1 + clear_tasks(dag_bag_ext, dag_0, task_a_0, end_date=None) + + ti_a_0_date_0 = tis_date_0["task_a_0"] + ti_b_3_date_0 = tis_date_0["task_b_3"] + ti_b_3_date_1 = tis_date_1["task_b_3"] + assert_ti_state_equal(ti_a_0_date_0, State.NONE) + assert_ti_state_equal(ti_b_3_date_0, State.NONE) + assert_ti_state_equal(ti_b_3_date_1, State.NONE) + + def test_external_task_marker_exception(dag_bag_ext): """ Clearing across multiple DAGs should raise AirflowException if more levels are being cleared
