This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new ace796a2733 [v3-2-test] Propagate triggering user to child DAG runs 
via TriggerDagRunOperator (#65747) (#66378)
ace796a2733 is described below

commit ace796a27337e0aa1dee4a0707a62179f39a89f7
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 13 18:23:25 2026 +0530

    [v3-2-test] Propagate triggering user to child DAG runs via 
TriggerDagRunOperator (#65747) (#66378)
    
    * [v3-2-test] Propagate triggering user to child DAG runs via 
TriggerDagRunOperator (#65747)
    
    Previously, when a task used TriggerDagRunOperator to trigger a child DAG,
      the child run's triggering_user_name was left unset and the UI displayed
      it as anonymous. The information was already captured on the parent
      DagRun but never forwarded through the Execution API.
    
      The POST /execution/dag-runs/{dag_id}/{run_id} endpoint now looks up the
      calling task instance's parent DagRun and forwards its 
triggering_user_name
      to the new child run. Chains of TriggerDagRunOperator runs now show the
      original human user end-to-end
    (cherry picked from commit f06df11b29292e6b9c023d3308092360eb06dec1)
    
    Co-authored-by: Dheeraj Turaga <[email protected]>
    
    * [v3-2-test] Drop TIClaims usage in backport test
    
    The backport's test imports TIClaims from
    airflow.api_fastapi.execution_api.datamodels.token, but TIClaims
    was added on main by #63604 ("Validate task identity token claims
    with a typed schema") which was not backported to v3-2-test.
    
    On v3-2-test, TIToken.claims is still typed as `dict[str, Any]`
    (loose dict), so the test can construct it with a plain dict
    instead of the TIClaims model. Drops the TIClaims import and
    replaces TIClaims(scope="execution") with {"scope": "execution"}.
    
    This is the same shape of fix as #66743 — backport's test
    references a symbol that exists only on main. Inlining the
    adapted construction avoids a feature backport (#63604) just to
    satisfy a test fixture's typed argument.
    
    ---------
    
    Co-authored-by: Dheeraj Turaga <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../api_fastapi/execution_api/routes/dag_runs.py   | 10 ++++++
 .../execution_api/versions/head/test_dag_runs.py   | 41 ++++++++++++++++++++++
 2 files changed, 51 insertions(+)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 69a06eca9b5..f1fd01dac71 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -32,9 +32,12 @@ from airflow.api_fastapi.common.types import UtcDateTime
 from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
 from airflow.api_fastapi.execution_api.datamodels.dagrun import 
DagRunStateResponse, TriggerDAGRunPayload
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.security import CurrentTIToken
 from airflow.exceptions import DagRunAlreadyExists
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun as DagRunModel
+from airflow.models.taskinstance import TaskInstance
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
@@ -94,6 +97,7 @@ def trigger_dag_run(
     run_id: str,
     payload: TriggerDAGRunPayload,
     session: SessionDep,
+    token: TIToken = CurrentTIToken,
 ) -> None:
     """Trigger a Dag run."""
     dm = session.scalar(select(DagModel).where(~DagModel.is_stale, 
DagModel.dag_id == dag_id).limit(1))
@@ -123,6 +127,11 @@ def trigger_dag_run(
             },
         )
 
+    # Inherit triggering_user_name from the calling task's DagRun so chains of
+    # TriggerDagRunOperator preserve the original human user across child runs.
+    parent_ti = session.get(TaskInstance, token.id)
+    triggering_user_name = parent_ti.dag_run.triggering_user_name if parent_ti 
else None
+
     try:
         trigger_dag(
             dag_id=dag_id,
@@ -130,6 +139,7 @@ def trigger_dag_run(
             conf=payload.conf,
             logical_date=payload.logical_date,
             triggered_by=DagRunTriggeredByType.OPERATOR,
+            triggering_user_name=triggering_user_name,
             replace_microseconds=False,
             partition_key=payload.partition_key,
             note=payload.note,
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index a2910313951..8d25936f77b 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -19,9 +19,12 @@ from __future__ import annotations
 
 import pytest
 import time_machine
+from fastapi import Request
 from sqlalchemy import select, update
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.security import require_auth
 from airflow.models import DagModel
 from airflow.models.dagrun import DagRun
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -191,6 +194,44 @@ class TestDagRunTrigger:
             }
         }
 
+    @pytest.mark.parametrize("parent_triggering_user_name", ["alice", None])
+    def test_trigger_dag_run_inherits_triggering_user_name(
+        self, client, exec_app, session, dag_maker, parent_triggering_user_name
+    ):
+        """Child DAG run inherits triggering_user_name from the calling task's 
parent run."""
+        parent_dag_id = "parent_dag_inherits"
+        parent_run_id = "parent_run"
+        child_dag_id = "child_dag_inherits"
+        child_run_id = "child_run"
+        logical_date = timezone.datetime(2025, 2, 20)
+
+        with dag_maker(dag_id=parent_dag_id, session=session, serialized=True):
+            EmptyOperator(task_id="trigger_task")
+        parent_run = dag_maker.create_dagrun(
+            run_id=parent_run_id, 
triggering_user_name=parent_triggering_user_name
+        )
+        parent_ti = parent_run.task_instances[0]
+
+        with dag_maker(dag_id=child_dag_id, session=session, serialized=True):
+            EmptyOperator(task_id="child_task")
+        session.commit()
+
+        async def auth_as_parent_ti(request: Request) -> TIToken:
+            return TIToken(id=parent_ti.id, claims={"scope": "execution"})
+
+        exec_app.dependency_overrides[require_auth] = auth_as_parent_ti
+        try:
+            response = client.post(
+                f"/execution/dag-runs/{child_dag_id}/{child_run_id}",
+                json={"logical_date": logical_date.isoformat()},
+            )
+        finally:
+            exec_app.dependency_overrides.pop(require_auth, None)
+
+        assert response.status_code == 204
+        child_run = session.scalars(select(DagRun).where(DagRun.run_id == 
child_run_id)).one()
+        assert child_run.triggering_user_name == parent_triggering_user_name
+
 
 class TestDagRunClear:
     def setup_method(self):

Reply via email to