This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d41bbbe47fc Set dag version on expanded tis (#47953)
d41bbbe47fc is described below

commit d41bbbe47fcca6205211d6d5d5dc5a9cc6f7ac96
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Mar 20 02:17:23 2025 +0800

    Set dag version on expanded tis (#47953)
    
    The version is set to the unmapped ti's if available, latest otherwise.
---
 airflow/models/taskmap.py                       | 17 ++++++++++++++++-
 airflow/ui/src/components/DagVersionDetails.tsx |  2 +-
 tests/models/test_taskinstance.py               | 23 +++++++++++++++++++++++
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskmap.py b/airflow/models/taskmap.py
index fdd37f1f5b5..04bd6d974e1 100644
--- a/airflow/models/taskmap.py
+++ b/airflow/models/taskmap.py
@@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any
 from sqlalchemy import CheckConstraint, Column, ForeignKeyConstraint, Integer, 
String, func, or_, select
 
 from airflow.models.base import COLLATION_ARGS, ID_LEN, 
TaskInstanceDependencies
+from airflow.models.dag_version import DagVersion
 from airflow.utils.db import exists_query
 from airflow.utils.sqlalchemy import ExtendedJSON, with_row_locks
 from airflow.utils.state import State, TaskInstanceState
@@ -207,6 +208,7 @@ class TaskMap(TaskInstanceDependencies):
                     task.log.debug("Deleting the original task instance: %s", 
unmapped_ti)
                     session.delete(unmapped_ti)
                 state = unmapped_ti.state
+            dag_version_id = unmapped_ti.dag_version_id
 
         if total_length is None or total_length < 1:
             # Nothing to fixup.
@@ -222,9 +224,22 @@ class TaskMap(TaskInstanceDependencies):
             )
             indexes_to_map = range(current_max_mapping + 1, total_length)
 
+        if unmapped_ti:
+            dag_version_id = unmapped_ti.dag_version_id
+        elif dag_version := DagVersion.get_latest_version(task.dag_id, 
session=session):
+            dag_version_id = dag_version.id
+        else:
+            dag_version_id = None
+
         for index in indexes_to_map:
             # TODO: Make more efficient with 
bulk_insert_mappings/bulk_save_mappings.
-            ti = TaskInstance(task, run_id=run_id, map_index=index, 
state=state)
+            ti = TaskInstance(
+                task,
+                run_id=run_id,
+                map_index=index,
+                state=state,
+                dag_version_id=dag_version_id,
+            )
             task.log.debug("Expanding TIs upserted %s", ti)
             task_instance_mutation_hook(ti)
             ti = session.merge(ti)
diff --git a/airflow/ui/src/components/DagVersionDetails.tsx 
b/airflow/ui/src/components/DagVersionDetails.tsx
index d1913feb52f..de46de97463 100644
--- a/airflow/ui/src/components/DagVersionDetails.tsx
+++ b/airflow/ui/src/components/DagVersionDetails.tsx
@@ -31,7 +31,7 @@ export const DagVersionDetails = ({ dagVersion }: { readonly 
dagVersion?: DagVer
       <Table.Body>
         <Table.Row>
           <Table.Cell>Version Number</Table.Cell>
-          <Table.Cell>v{dagVersion.version_number}</Table.Cell>
+          <Table.Cell>{dagVersion.version_number ? 
"v{dagVersion.version_number}" : "unknown"}</Table.Cell>
         </Table.Row>
         <Table.Row>
           <Table.Cell>Bundle Name</Table.Cell>
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 870f54cdf2c..f9a7d6fe12c 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -4828,6 +4828,29 @@ class TestMappedTaskInstanceReceiveValue:
             ti.run()
         assert outputs == expected_outputs
 
+    def test_map_has_dag_version(self, dag_maker, session):
+        from airflow.models.dag_version import DagVersion
+
+        known_versions = {}
+
+        with dag_maker(dag_id="test", session=session) as dag:
+
+            @dag.task
+            def show(value, *, ti):
+                known_versions[ti.map_index] = ti.dag_version_id
+
+            show.expand(value=[1, 2, 3])
+
+        dag_version = session.merge(DagVersion(dag_id="test", 
bundle_name="test"))
+
+        dag_maker.create_dagrun(dag_version=dag_version)
+        task = dag.get_task("show")
+        for ti in session.scalars(select(TI)):
+            ti.refresh_from_task(task)
+            ti.run()
+
+        assert known_versions == {0: dag_version.id, 1: dag_version.id, 2: 
dag_version.id}
+
     @pytest.mark.parametrize(
         "upstream_return, expected_outputs",
         [

Reply via email to