This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d41bbbe47fc Set dag version on expanded tis (#47953)
d41bbbe47fc is described below
commit d41bbbe47fcca6205211d6d5d5dc5a9cc6f7ac96
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Mar 20 02:17:23 2025 +0800
Set dag version on expanded tis (#47953)
The version is set to the unmapped ti's if available, latest otherwise.
---
airflow/models/taskmap.py | 17 ++++++++++++++++-
airflow/ui/src/components/DagVersionDetails.tsx | 2 +-
tests/models/test_taskinstance.py | 23 +++++++++++++++++++++++
3 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/airflow/models/taskmap.py b/airflow/models/taskmap.py
index fdd37f1f5b5..04bd6d974e1 100644
--- a/airflow/models/taskmap.py
+++ b/airflow/models/taskmap.py
@@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any
from sqlalchemy import CheckConstraint, Column, ForeignKeyConstraint, Integer,
String, func, or_, select
from airflow.models.base import COLLATION_ARGS, ID_LEN,
TaskInstanceDependencies
+from airflow.models.dag_version import DagVersion
from airflow.utils.db import exists_query
from airflow.utils.sqlalchemy import ExtendedJSON, with_row_locks
from airflow.utils.state import State, TaskInstanceState
@@ -207,6 +208,7 @@ class TaskMap(TaskInstanceDependencies):
task.log.debug("Deleting the original task instance: %s",
unmapped_ti)
session.delete(unmapped_ti)
state = unmapped_ti.state
+ dag_version_id = unmapped_ti.dag_version_id
if total_length is None or total_length < 1:
# Nothing to fixup.
@@ -222,9 +224,22 @@ class TaskMap(TaskInstanceDependencies):
)
indexes_to_map = range(current_max_mapping + 1, total_length)
+ if unmapped_ti:
+ dag_version_id = unmapped_ti.dag_version_id
+ elif dag_version := DagVersion.get_latest_version(task.dag_id,
session=session):
+ dag_version_id = dag_version.id
+ else:
+ dag_version_id = None
+
for index in indexes_to_map:
# TODO: Make more efficient with
bulk_insert_mappings/bulk_save_mappings.
- ti = TaskInstance(task, run_id=run_id, map_index=index,
state=state)
+ ti = TaskInstance(
+ task,
+ run_id=run_id,
+ map_index=index,
+ state=state,
+ dag_version_id=dag_version_id,
+ )
task.log.debug("Expanding TIs upserted %s", ti)
task_instance_mutation_hook(ti)
ti = session.merge(ti)
diff --git a/airflow/ui/src/components/DagVersionDetails.tsx
b/airflow/ui/src/components/DagVersionDetails.tsx
index d1913feb52f..de46de97463 100644
--- a/airflow/ui/src/components/DagVersionDetails.tsx
+++ b/airflow/ui/src/components/DagVersionDetails.tsx
@@ -31,7 +31,7 @@ export const DagVersionDetails = ({ dagVersion }: { readonly
dagVersion?: DagVer
<Table.Body>
<Table.Row>
<Table.Cell>Version Number</Table.Cell>
- <Table.Cell>v{dagVersion.version_number}</Table.Cell>
+ <Table.Cell>{dagVersion.version_number ?
"v{dagVersion.version_number}" : "unknown"}</Table.Cell>
</Table.Row>
<Table.Row>
<Table.Cell>Bundle Name</Table.Cell>
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 870f54cdf2c..f9a7d6fe12c 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -4828,6 +4828,29 @@ class TestMappedTaskInstanceReceiveValue:
ti.run()
assert outputs == expected_outputs
+ def test_map_has_dag_version(self, dag_maker, session):
+ from airflow.models.dag_version import DagVersion
+
+ known_versions = {}
+
+ with dag_maker(dag_id="test", session=session) as dag:
+
+ @dag.task
+ def show(value, *, ti):
+ known_versions[ti.map_index] = ti.dag_version_id
+
+ show.expand(value=[1, 2, 3])
+
+ dag_version = session.merge(DagVersion(dag_id="test",
bundle_name="test"))
+
+ dag_maker.create_dagrun(dag_version=dag_version)
+ task = dag.get_task("show")
+ for ti in session.scalars(select(TI)):
+ ti.refresh_from_task(task)
+ ti.run()
+
+ assert known_versions == {0: dag_version.id, 1: dag_version.id, 2:
dag_version.id}
+
@pytest.mark.parametrize(
"upstream_return, expected_outputs",
[