This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7722b6f226 Ensure dynamic tasks inside dynamic task group only marks
the (#32354)
7722b6f226 is described below
commit 7722b6f226e9db3a89b01b89db5fdb7a1ab2256f
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Wed Jul 5 14:08:29 2023 +0530
Ensure dynamic tasks inside dynamic task group only marks the (#32354)
corresponding EmptyOperator in downstream as success.
---
airflow/models/dagrun.py | 4 ++--
tests/models/test_dagrun.py | 46 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8f3b3a3301..b0b56f3c6c 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1338,7 +1338,7 @@ class DagRun(Base, LoggingMixin):
and not ti.task.on_success_callback
and not ti.task.outlets
):
- dummy_ti_ids.append(ti.task_id)
+ dummy_ti_ids.append((ti.task_id, ti.map_index))
else:
schedulable_ti_ids.append((ti.task_id, ti.map_index))
@@ -1369,7 +1369,7 @@ class DagRun(Base, LoggingMixin):
.where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
- TI.task_id.in_(dummy_ti_ids_chunk),
+ tuple_in_condition((TI.task_id, TI.map_index),
dummy_ti_ids_chunk),
)
.values(
state=TaskInstanceState.SUCCESS,
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5d7db547d3..c90ec1aef3 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -1811,6 +1811,52 @@ def test_mapped_task_group_expands_at_create(dag_maker,
session):
]
+def test_mapped_task_group_empty_operator(dag_maker, session):
+ """
+ Test that dynamic task inside a dynamic task group only marks
+ the corresponding downstream EmptyOperator as success.
+ """
+
+ literal = [1, 2, 3]
+
+ with dag_maker(session=session) as dag:
+
+ @task_group
+ def tg(x):
+ @task
+ def t1(x):
+ return x
+
+ t2 = EmptyOperator(task_id="t2")
+
+ @task
+ def t3(x):
+ return x
+
+ t1(x) >> t2 >> t3(x)
+
+ tg.expand(x=literal)
+
+ dr = dag_maker.create_dagrun()
+
+ t2_task = dag.get_task("tg.t2")
+ t2_0 = dr.get_task_instance(task_id="tg.t2", map_index=0)
+ t2_0.refresh_from_task(t2_task)
+ assert t2_0.state is None
+
+ t2_1 = dr.get_task_instance(task_id="tg.t2", map_index=1)
+ t2_1.refresh_from_task(t2_task)
+ assert t2_1.state is None
+
+ dr.schedule_tis([t2_0])
+
+ t2_0 = dr.get_task_instance(task_id="tg.t2", map_index=0)
+ assert t2_0.state == TaskInstanceState.SUCCESS
+
+ t2_1 = dr.get_task_instance(task_id="tg.t2", map_index=1)
+ assert t2_1.state is None
+
+
def test_ti_scheduling_mapped_zero_length(dag_maker, session):
with dag_maker(session=session):
task = BaseOperator(task_id="task_1")