This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ccbe4e7ea A manual run can't look like a scheduled one (#28397)
7ccbe4e7ea is described below
commit 7ccbe4e7eaa529641052779a89e34d54c5a20f72
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Dec 22 09:54:26 2022 +0800
A manual run can't look like a scheduled one (#28397)
Fix https://github.com/apache/airflow/issues/27818
---
airflow/models/dag.py | 23 ++++++++++++++++++-----
tests/models/test_dag.py | 20 ++++++++++++++++++++
2 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c08a7f236f..6385634963 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2549,7 +2549,7 @@ class DAG(LoggingMixin):
external_trigger: bool | None = False,
conf: dict | None = None,
run_type: DagRunType | None = None,
- session=NEW_SESSION,
+ session: Session = NEW_SESSION,
dag_hash: str | None = None,
creating_job_id: int | None = None,
data_interval: tuple[datetime, datetime] | None = None,
@@ -2586,14 +2586,27 @@ class DAG(LoggingMixin):
else:
data_interval =
self.infer_automated_data_interval(logical_date)
+ if run_type is None or isinstance(run_type, DagRunType):
+ pass
+ elif isinstance(run_type, str): # Compatibility: run_type used to be
a str.
+ run_type = DagRunType(run_type)
+ else:
+ raise ValueError(f"`run_type` should be a DagRunType, not
{type(run_type)}")
+
if run_id: # Infer run_type from run_id if needed.
if not isinstance(run_id, str):
raise ValueError(f"`run_id` should be a str, not
{type(run_id)}")
- if not run_type:
- run_type = DagRunType.from_run_id(run_id)
+ inferred_run_type = DagRunType.from_run_id(run_id)
+ if run_type is None:
+ # No explicit type given, use the inferred type.
+ run_type = inferred_run_type
+ elif run_type == DagRunType.MANUAL and inferred_run_type !=
DagRunType.MANUAL:
+ # Prevent a manual run from using an ID that looks like a
scheduled run.
+ raise ValueError(
+ f"A {run_type.value} DAG run cannot use ID {run_id!r}
since it "
+ f"is reserved for {inferred_run_type.value} runs"
+ )
elif run_type and logical_date is not None: # Generate run_id from
run_type and execution_date.
- if not isinstance(run_type, DagRunType):
- raise ValueError(f"`run_type` should be a DagRunType, not
{type(run_type)}")
run_id = self.timetable.generate_run_id(
run_type=run_type, logical_date=logical_date,
data_interval=data_interval
)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 6d3ca77bbe..9f1ba1e743 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3147,3 +3147,23 @@ def test_dag_uses_timetable_for_run_id(session):
)
assert dag_run.run_id == "abc"
+
+
[email protected](
+ "run_id_type",
+ [DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED,
DagRunType.DATASET_TRIGGERED],
+)
+def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type:
DagRunType) -> None:
+ dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily")
+ run_id = run_id_type.generate_run_id(DEFAULT_DATE)
+ with pytest.raises(ValueError) as ctx:
+ dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ run_id=run_id,
+ execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ state=DagRunState.QUEUED,
+ )
+ assert str(ctx.value) == (
+ f"A manual DAG run cannot use ID {run_id!r} since it is reserved for
{run_id_type.value} runs"
+ )