This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ed51aff0562 Add DagRunType for operator (#63733)
ed51aff0562 is described below

commit ed51aff0562556fa1dd7d1ff6152018e70db03c2
Author: Henry Chen <[email protected]>
AuthorDate: Fri Apr 3 04:31:50 2026 +0800

    Add DagRunType for operator (#63733)
    
    * Execution API: Enforce OPERATOR run type for operator-triggered DAG runs
    
    * naming operator as operator_triggered
    
    * Fix trigger_dag syntax error after rebase
    
    * address unrelated changes
---
 .../api_fastapi/core_api/openapi/_private_ui.yaml  |  1 +
 .../core_api/openapi/v2-rest-api-generated.yaml    |  1 +
 .../api_fastapi/execution_api/routes/dag_runs.py   |  7 ++---
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  2 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  2 +-
 .../src/airflow/ui/src/components/RunTypeIcon.tsx  |  3 ++
 airflow-core/src/airflow/utils/types.py            |  1 +
 .../tests/unit/api/common/test_trigger_dag.py      | 19 +++++++++++++
 .../execution_api/versions/head/test_dag_runs.py   | 32 ++++++++++++++++++++--
 .../src/airflowctl/api/datamodels/generated.py     |  1 +
 .../src/airflow/sdk/api/datamodels/_generated.py   |  1 +
 11 files changed, 62 insertions(+), 8 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 0e58f9cba26..0ae06e44046 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -2191,6 +2191,7 @@ components:
       - backfill
       - scheduled
       - manual
+      - operator_triggered
       - asset_triggered
       - asset_materialization
       title: DagRunType
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 6a8b2a3d7e1..ace51229702 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11175,6 +11175,7 @@ components:
       - backfill
       - scheduled
       - manual
+      - operator_triggered
       - asset_triggered
       - asset_materialization
       title: DagRunType
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 69a06eca9b5..448d0b6945a 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -112,14 +112,12 @@ def trigger_dag_run(
             },
         )
 
-    # TODO: TriggerDagRunOperator also calls this route but creates MANUAL 
runs.
-    #  Consider a dedicated run type for operator-triggered runs.
-    if dm.allowed_run_types is not None and DagRunType.MANUAL not in 
dm.allowed_run_types:
+    if dm.allowed_run_types is not None and DagRunType.OPERATOR_TRIGGERED not 
in dm.allowed_run_types:
         raise HTTPException(
             status.HTTP_400_BAD_REQUEST,
             detail={
                 "reason": "denied_run_type",
-                "message": f"Dag with dag_id '{dag_id}' does not allow manual 
runs",
+                "message": f"Dag with dag_id '{dag_id}' does not allow 
operator-triggered runs",
             },
         )
 
@@ -127,6 +125,7 @@ def trigger_dag_run(
         trigger_dag(
             dag_id=dag_id,
             run_id=run_id,
+            run_type=DagRunType.OPERATOR_TRIGGERED,
             conf=payload.conf,
             logical_date=payload.logical_date,
             triggered_by=DagRunTriggeredByType.OPERATOR,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index cbc0ecaa172..c37bb840017 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3323,7 +3323,7 @@ export const $DagRunTriggeredByType = {
 
 export const $DagRunType = {
     type: 'string',
-    enum: ['backfill', 'scheduled', 'manual', 'asset_triggered', 
'asset_materialization'],
+    enum: ['backfill', 'scheduled', 'manual', 'operator_triggered', 
'asset_triggered', 'asset_materialization'],
     title: 'DagRunType',
     description: 'Class with DagRun types.'
 } as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 2567a22bacc..9a6c9fb7935 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -818,7 +818,7 @@ export type DagRunTriggeredByType = 'cli' | 'operator' | 
'rest_api' | 'ui' | 'te
 /**
  * Class with DagRun types.
  */
-export type DagRunType = 'backfill' | 'scheduled' | 'manual' | 
'asset_triggered' | 'asset_materialization';
+export type DagRunType = 'backfill' | 'scheduled' | 'manual' | 
'operator_triggered' | 'asset_triggered' | 'asset_materialization';
 
 /**
  * DAG schedule reference serializer for assets.
diff --git a/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx 
b/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx
index 1382a55b549..9eef6066a0b 100644
--- a/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx
+++ b/airflow-core/src/airflow/ui/src/components/RunTypeIcon.tsx
@@ -18,6 +18,7 @@
  */
 import type { IconBaseProps } from "react-icons";
 import { HiDatabase } from "react-icons/hi";
+import { HiLightningBolt } from "react-icons/hi";
 import { MdPlayArrow, MdOutlineSchedule } from "react-icons/md";
 import { RiArrowGoBackFill } from "react-icons/ri";
 
@@ -41,6 +42,8 @@ export const RunTypeIcon = ({ runType, ...rest }: Props) => {
       return <RiArrowGoBackFill style={iconStyle} {...rest} />;
     case "manual":
       return <MdPlayArrow style={iconStyle} {...rest} />;
+    case "operator_triggered":
+      return <HiLightningBolt style={iconStyle} {...rest} />;
     case "scheduled":
       return <MdOutlineSchedule style={iconStyle} {...rest} />;
     default:
diff --git a/airflow-core/src/airflow/utils/types.py 
b/airflow-core/src/airflow/utils/types.py
index 391fbe5f71c..46fb18339e1 100644
--- a/airflow-core/src/airflow/utils/types.py
+++ b/airflow-core/src/airflow/utils/types.py
@@ -28,6 +28,7 @@ class DagRunType(str, enum.Enum):
     BACKFILL_JOB = "backfill"
     SCHEDULED = "scheduled"
     MANUAL = "manual"
+    OPERATOR_TRIGGERED = "operator_triggered"
     ASSET_TRIGGERED = "asset_triggered"
     ASSET_MATERIALIZATION = "asset_materialization"
 
diff --git a/airflow-core/tests/unit/api/common/test_trigger_dag.py 
b/airflow-core/tests/unit/api/common/test_trigger_dag.py
index b94b42e95af..49a385865f4 100644
--- a/airflow-core/tests/unit/api/common/test_trigger_dag.py
+++ b/airflow-core/tests/unit/api/common/test_trigger_dag.py
@@ -81,3 +81,22 @@ def test_trigger_dag_with_custom_run_type(dag_maker, 
session):
     )
 
     assert dag_run.run_type == DagRunType.ASSET_MATERIALIZATION
+
+
+def test_trigger_dag_operator_denied_when_only_manual_allowed(dag_maker, 
session):
+    with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *"):
+        EmptyOperator(task_id="mytask")
+    session.commit()
+    dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
"TEST_DAG_1"))
+    dag_model.allowed_run_types = ["manual"]
+    session.commit()
+
+    with pytest.raises(
+        ValueError, match="Dag with dag_id: 'TEST_DAG_1' does not allow 
operator_triggered runs"
+    ):
+        trigger_dag(
+            dag_id="TEST_DAG_1",
+            run_type=DagRunType.OPERATOR_TRIGGERED,
+            triggered_by=DagRunTriggeredByType.OPERATOR,
+            session=session,
+        )
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index a2910313951..8b1fa606694 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -60,6 +60,7 @@ class TestDagRunTrigger:
         dag_run = session.scalars(select(DagRun).where(DagRun.run_id == 
run_id)).one()
         assert dag_run.conf == {"key1": "value1"}
         assert dag_run.logical_date == logical_date
+        assert dag_run.run_type == DagRunType.OPERATOR_TRIGGERED
 
     def test_trigger_dag_run_with_partition_key(self, client, session, 
dag_maker):
         dag_id = "test_trigger_dag_run_partition_key"
@@ -130,7 +131,7 @@ class TestDagRunTrigger:
         }
 
     def test_trigger_dag_run_denied_run_type(self, client, session, dag_maker):
-        """Test that a Dag with allowed_run_types excluding 'manual' cannot be 
triggered."""
+        """Test that a Dag with denied operator run type cannot be 
triggered."""
         dag_id = "test_trigger_dag_run_denied"
         run_id = "test_run_id"
         logical_date = timezone.datetime(2025, 2, 20)
@@ -151,7 +152,34 @@ class TestDagRunTrigger:
         assert response.status_code == 400
         assert response.json() == {
             "detail": {
-                "message": f"Dag with dag_id '{dag_id}' does not allow manual 
runs",
+                "message": f"Dag with dag_id '{dag_id}' does not allow 
operator-triggered runs",
+                "reason": "denied_run_type",
+            }
+        }
+
+    def test_trigger_dag_run_manual_denied_for_operator(self, client, session, 
dag_maker):
+        """Test that MANUAL-only allowed_run_types rejects operator-triggered 
runs."""
+        dag_id = "test_trigger_dag_run_manual_allowed"
+        run_id = "test_run_id"
+        logical_date = timezone.datetime(2025, 2, 20)
+
+        with dag_maker(dag_id=dag_id, session=session, serialized=True):
+            EmptyOperator(task_id="test_task")
+
+        session.execute(
+            update(DagModel).where(DagModel.dag_id == 
dag_id).values(allowed_run_types=["manual"])
+        )
+        session.commit()
+
+        response = client.post(
+            f"/execution/dag-runs/{dag_id}/{run_id}",
+            json={"logical_date": logical_date.isoformat()},
+        )
+
+        assert response.status_code == 400
+        assert response.json() == {
+            "detail": {
+                "message": f"Dag with dag_id '{dag_id}' does not allow 
operator-triggered runs",
                 "reason": "denied_run_type",
             }
         }
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 17aa78431a0..a0b32b7fbdd 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -395,6 +395,7 @@ class DagRunType(str, Enum):
     BACKFILL = "backfill"
     SCHEDULED = "scheduled"
     MANUAL = "manual"
+    OPERATOR_TRIGGERED = "operator_triggered"
     ASSET_TRIGGERED = "asset_triggered"
     ASSET_MATERIALIZATION = "asset_materialization"
 
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index e08f00562d3..d45f8239306 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -143,6 +143,7 @@ class DagRunType(str, Enum):
     BACKFILL = "backfill"
     SCHEDULED = "scheduled"
     MANUAL = "manual"
+    OPERATOR_TRIGGERED = "operator_triggered"
     ASSET_TRIGGERED = "asset_triggered"
     ASSET_MATERIALIZATION = "asset_materialization"
 

Reply via email to