This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new ace796a2733 [v3-2-test] Propagate triggering user to child DAG runs
via TriggerDagRunOperator (#65747) (#66378)
ace796a2733 is described below
commit ace796a27337e0aa1dee4a0707a62179f39a89f7
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 13 18:23:25 2026 +0530
[v3-2-test] Propagate triggering user to child DAG runs via
TriggerDagRunOperator (#65747) (#66378)
* [v3-2-test] Propagate triggering user to child DAG runs via
TriggerDagRunOperator (#65747)
Previously, when a task used TriggerDagRunOperator to trigger a child DAG,
the child run's triggering_user_name was left unset and the UI displayed
it as anonymous. The information was already captured on the parent
DagRun but never forwarded through the Execution API.
The POST /execution/dag-runs/{dag_id}/{run_id} endpoint now looks up the
calling task instance's parent DagRun and forwards its
triggering_user_name
to the new child run. Chains of TriggerDagRunOperator runs now show the
original human user end-to-end
(cherry picked from commit f06df11b29292e6b9c023d3308092360eb06dec1)
Co-authored-by: Dheeraj Turaga <[email protected]>
* [v3-2-test] Drop TIClaims usage in backport test
The backport's test imports TIClaims from
airflow.api_fastapi.execution_api.datamodels.token, but TIClaims
was added on main by #63604 ("Validate task identity token claims
with a typed schema") which was not backported to v3-2-test.
On v3-2-test, TIToken.claims is still typed as `dict[str, Any]`
(loose dict), so the test can construct it with a plain dict
instead of the TIClaims model. Drops the TIClaims import and
replaces TIClaims(scope="execution") with {"scope": "execution"}.
This is the same shape of fix as #66743 — backport's test
references a symbol that exists only on main. Inlining the
adapted construction avoids a feature backport (#63604) just to
satisfy a test fixture's typed argument.
---------
Co-authored-by: Dheeraj Turaga <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../api_fastapi/execution_api/routes/dag_runs.py | 10 ++++++
.../execution_api/versions/head/test_dag_runs.py | 41 ++++++++++++++++++++++
2 files changed, 51 insertions(+)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 69a06eca9b5..f1fd01dac71 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -32,9 +32,12 @@ from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
from airflow.api_fastapi.execution_api.datamodels.dagrun import
DagRunStateResponse, TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.security import CurrentTIToken
from airflow.exceptions import DagRunAlreadyExists
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DagRunModel
+from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -94,6 +97,7 @@ def trigger_dag_run(
run_id: str,
payload: TriggerDAGRunPayload,
session: SessionDep,
+ token: TIToken = CurrentTIToken,
) -> None:
"""Trigger a Dag run."""
dm = session.scalar(select(DagModel).where(~DagModel.is_stale,
DagModel.dag_id == dag_id).limit(1))
@@ -123,6 +127,11 @@ def trigger_dag_run(
},
)
+ # Inherit triggering_user_name from the calling task's DagRun so chains of
+ # TriggerDagRunOperator preserve the original human user across child runs.
+ parent_ti = session.get(TaskInstance, token.id)
+ triggering_user_name = parent_ti.dag_run.triggering_user_name if parent_ti
else None
+
try:
trigger_dag(
dag_id=dag_id,
@@ -130,6 +139,7 @@ def trigger_dag_run(
conf=payload.conf,
logical_date=payload.logical_date,
triggered_by=DagRunTriggeredByType.OPERATOR,
+ triggering_user_name=triggering_user_name,
replace_microseconds=False,
partition_key=payload.partition_key,
note=payload.note,
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index a2910313951..8d25936f77b 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -19,9 +19,12 @@ from __future__ import annotations
import pytest
import time_machine
+from fastapi import Request
from sqlalchemy import select, update
from airflow._shared.timezones import timezone
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.security import require_auth
from airflow.models import DagModel
from airflow.models.dagrun import DagRun
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -191,6 +194,44 @@ class TestDagRunTrigger:
}
}
+ @pytest.mark.parametrize("parent_triggering_user_name", ["alice", None])
+ def test_trigger_dag_run_inherits_triggering_user_name(
+ self, client, exec_app, session, dag_maker, parent_triggering_user_name
+ ):
+ """Child DAG run inherits triggering_user_name from the calling task's
parent run."""
+ parent_dag_id = "parent_dag_inherits"
+ parent_run_id = "parent_run"
+ child_dag_id = "child_dag_inherits"
+ child_run_id = "child_run"
+ logical_date = timezone.datetime(2025, 2, 20)
+
+ with dag_maker(dag_id=parent_dag_id, session=session, serialized=True):
+ EmptyOperator(task_id="trigger_task")
+ parent_run = dag_maker.create_dagrun(
+ run_id=parent_run_id,
triggering_user_name=parent_triggering_user_name
+ )
+ parent_ti = parent_run.task_instances[0]
+
+ with dag_maker(dag_id=child_dag_id, session=session, serialized=True):
+ EmptyOperator(task_id="child_task")
+ session.commit()
+
+ async def auth_as_parent_ti(request: Request) -> TIToken:
+ return TIToken(id=parent_ti.id, claims={"scope": "execution"})
+
+ exec_app.dependency_overrides[require_auth] = auth_as_parent_ti
+ try:
+ response = client.post(
+ f"/execution/dag-runs/{child_dag_id}/{child_run_id}",
+ json={"logical_date": logical_date.isoformat()},
+ )
+ finally:
+ exec_app.dependency_overrides.pop(require_auth, None)
+
+ assert response.status_code == 204
+ child_run = session.scalars(select(DagRun).where(DagRun.run_id ==
child_run_id)).one()
+ assert child_run.triggering_user_name == parent_triggering_user_name
+
class TestDagRunClear:
def setup_method(self):