This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 508ced8e82d Fix TriggerDagRunOperator fail_when_dag_is_paused on
Airflow 3.2+ (#67726)
508ced8e82d is described below
commit 508ced8e82db10c786b511b5b4e73f5f1748fb6f
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri May 29 19:08:49 2026 +0100
Fix TriggerDagRunOperator fail_when_dag_is_paused on Airflow 3.2+ (#67726)
On Airflow 3.x the paused check called DagModel.get_current(), which
accesses the ORM directly and is blocked from task execution, so
fail_when_dag_is_paused=True raised "Direct database access via the ORM
is not allowed". It was previously stubbed to raise NotImplementedError
on Airflow 3.x.
Resolve the target DAG's paused state through the task-SDK get_dag()
accessor (backed by the GetDag execution-API endpoint) and raise
DagIsPaused when paused. This works on Airflow 3.2.0+, where the endpoint
and accessor exist. On Airflow 3.0/3.1 the operator still fails fast at
construction with a clearer NotImplementedError pointing to the 3.2.0
requirement. The Airflow 2.x path is unchanged.
Fixes #56954.
---
.../providers/standard/operators/trigger_dagrun.py | 37 ++++++++++-----
.../unit/standard/operators/test_trigger_dagrun.py | 54 ++++++++++++++++++++--
2 files changed, 75 insertions(+), 16 deletions(-)
diff --git
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index 73884433af5..4aca1bfc4c1 100644
---
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -43,7 +43,12 @@ from airflow.providers.common.compat.sdk import (
)
from airflow.providers.standard.triggers.external_task import DagStateTrigger
from airflow.providers.standard.utils.openlineage import
safe_inject_openlineage_properties_into_dagrun_conf
-from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS,
BaseOperator, is_arg_set
+from airflow.providers.standard.version_compat import (
+ AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_2_PLUS,
+ BaseOperator,
+ is_arg_set,
+)
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
@@ -145,7 +150,9 @@ class TriggerDagRunOperator(BaseOperator):
Default is ``[DagRunState.FAILED]``.
:param skip_when_already_exists: Set to true to mark the task as SKIPPED
if a DAG run of the triggered
DAG for the same logical date already exists.
- :param fail_when_dag_is_paused: If the dag to trigger is paused,
DagIsPaused will be raised.
+ :param fail_when_dag_is_paused: If the dag to trigger is paused,
DagIsPaused will be raised. On
+ Airflow 3.x this requires Airflow 3.2.0+ (it relies on the task-SDK
DAG state endpoint added then);
+ on Airflow 3.0/3.1 setting this raises ``NotImplementedError``.
:param deferrable: If waiting for completion, whether to defer the task
until done, default is ``False``.
:param openlineage_inject_parent_info: whether to include OpenLineage
metadata about the parent task
in the triggered DAG run's conf, enabling improved lineage tracking.
The metadata is only injected
@@ -218,8 +225,11 @@ class TriggerDagRunOperator(BaseOperator):
run_after = _validate_datetime_param("run_after", run_after)
self.logical_date = logical_date
self.run_after = run_after
- if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS:
- raise NotImplementedError("Setting `fail_when_dag_is_paused` not
yet supported for Airflow 3.x")
+ if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS and not
AIRFLOW_V_3_2_PLUS:
+ raise NotImplementedError(
+ "Setting `fail_when_dag_is_paused` requires Airflow 3.2.0+ on
Airflow 3.x "
+ "(it relies on the task-SDK DAG state endpoint added in
3.2.0)."
+ )
def execute(self, context: Context):
if self.logical_date is NOTSET:
@@ -267,14 +277,17 @@ class TriggerDagRunOperator(BaseOperator):
self.trigger_run_id = run_id
if self.fail_when_dag_is_paused:
- dag_model = DagModel.get_current(self.trigger_dag_id)
- if not dag_model:
- raise ValueError(f"Dag {self.trigger_dag_id} is not found")
- if dag_model.is_paused:
- # TODO: enable this when dag state endpoint available from
task sdk
- # if AIRFLOW_V_3_0_PLUS:
- # raise DagIsPaused(dag_id=self.trigger_dag_id)
- raise AirflowException(f"Dag {self.trigger_dag_id} is paused")
+ if AIRFLOW_V_3_0_PLUS:
+ # Tasks cannot access the ORM directly in Airflow 3.x; fetch
the DAG state via the
+ # task-SDK supervisor (GetDag execution-API endpoint,
available from Airflow 3.2.0).
+ if context["ti"].get_dag(self.trigger_dag_id).is_paused:
+ raise DagIsPaused(dag_id=self.trigger_dag_id)
+ else:
+ dag_model = DagModel.get_current(self.trigger_dag_id)
+ if not dag_model:
+ raise ValueError(f"Dag {self.trigger_dag_id} is not found")
+ if dag_model.is_paused:
+ raise AirflowException(f"Dag {self.trigger_dag_id} is
paused")
if AIRFLOW_V_3_0_PLUS:
self._trigger_dag_af_3(
diff --git
a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
index bcc5ad6153c..f4908336ab3 100644
--- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
+++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
@@ -38,7 +38,11 @@ from airflow.utils.state import DagRunState,
TaskInstanceState
from airflow.utils.types import DagRunType
from tests_common.test_utils.db import parse_and_sync_to_db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS
+from tests_common.test_utils.version_compat import (
+ AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_1_PLUS,
+ AIRFLOW_V_3_2_PLUS,
+)
if AIRFLOW_V_3_0_PLUS:
from airflow.providers.common.compat.sdk import DagRunTriggerException
@@ -332,10 +336,13 @@ class TestDagRunOperator:
{}, {"dag_id": "dag_id", "run_ids": ["run_id_1"],
"poll_interval": 15, "run_id_1": "success"}
)
- @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is
different for Airflow 2 & 3")
- def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
+ @pytest.mark.skipif(
+ not (AIRFLOW_V_3_0_PLUS and not AIRFLOW_V_3_2_PLUS),
+ reason="On Airflow 3.0/3.1 the worker cannot resolve the DAG paused
state",
+ )
+ def
test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail_below_3_2(self):
with pytest.raises(
- NotImplementedError, match="Setting `fail_when_dag_is_paused` not
yet supported for Airflow 3.x"
+ NotImplementedError, match="Setting `fail_when_dag_is_paused`
requires Airflow 3.2.0"
):
TriggerDagRunOperator(
task_id="test_task",
@@ -344,6 +351,45 @@ class TestDagRunOperator:
fail_when_dag_is_paused=True,
)
+ @pytest.mark.skipif(
+ not AIRFLOW_V_3_2_PLUS, reason="Needs the task-SDK GetDag endpoint
added in Airflow 3.2.0"
+ )
+ def test_trigger_dagrun_fails_when_target_dag_is_paused(self):
+ from airflow.providers.standard.operators.trigger_dagrun import
DagIsPaused
+
+ task = TriggerDagRunOperator(
+ task_id="test_task",
+ trigger_dag_id=TRIGGERED_DAG_ID,
+ fail_when_dag_is_paused=True,
+ openlineage_inject_parent_info=False,
+ )
+ mock_ti = mock.MagicMock()
+ mock_ti.get_dag.return_value.is_paused = True
+
+ with pytest.raises(DagIsPaused, match=f"Dag {TRIGGERED_DAG_ID} is
paused"):
+ task.execute(context={"ti": mock_ti})
+
+ mock_ti.get_dag.assert_called_once_with(TRIGGERED_DAG_ID)
+
+ @pytest.mark.skipif(
+ not AIRFLOW_V_3_2_PLUS, reason="Needs the task-SDK GetDag endpoint
added in Airflow 3.2.0"
+ )
+ def test_trigger_dagrun_proceeds_when_target_dag_is_not_paused(self):
+ task = TriggerDagRunOperator(
+ task_id="test_task",
+ trigger_dag_id=TRIGGERED_DAG_ID,
+ fail_when_dag_is_paused=True,
+ openlineage_inject_parent_info=False,
+ )
+ mock_ti = mock.MagicMock()
+ mock_ti.get_dag.return_value.is_paused = False
+
+ with pytest.raises(DagRunTriggerException) as exc_info:
+ task.execute(context={"ti": mock_ti})
+
+ assert exc_info.value.trigger_dag_id == TRIGGERED_DAG_ID
+ mock_ti.get_dag.assert_called_once_with(TRIGGERED_DAG_ID)
+
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is
different for Airflow 2 & 3")
def test_trigger_dagrun_with_str_conf(self):
"""