This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 60cd5ad302d Introducing get_run_data_interval on LazyDeserializedDAG 
(#45211)
60cd5ad302d is described below

commit 60cd5ad302d9140650160a89d86288f145118fb1
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Dec 27 19:40:10 2024 +0530

    Introducing get_run_data_interval on LazyDeserializedDAG (#45211)
---
 airflow/serialization/serialized_objects.py | 18 ++++++++++++++--
 tests/dag_processing/test_collection.py     | 33 ++++++++++++++++++++++++++++-
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 1f43f7865d1..3263f3dc5c3 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -41,7 +41,7 @@ from airflow.callbacks.callback_requests import 
DagCallbackRequest, TaskCallback
 from airflow.exceptions import AirflowException, SerializationError, 
TaskDeferred
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.connection import Connection
-from airflow.models.dag import DAG
+from airflow.models.dag import DAG, _get_model_data_interval
 from airflow.models.expandinput import (
     EXPAND_INPUT_EMPTY,
     create_expand_input,
@@ -95,13 +95,14 @@ from airflow.utils.types import NOTSET, ArgNotSet, 
AttributeRemoved
 if TYPE_CHECKING:
     from inspect import Parameter
 
+    from airflow.models import DagRun
     from airflow.models.baseoperatorlink import BaseOperatorLink
     from airflow.models.expandinput import ExpandInput
     from airflow.models.operator import Operator
     from airflow.sdk.definitions.node import DAGNode
     from airflow.serialization.json_schema import Validator
     from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
-    from airflow.timetables.base import DagRunInfo, Timetable
+    from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
 
     HAS_KUBERNETES: bool
     try:
@@ -1960,6 +1961,19 @@ class LazyDeserializedDAG(pydantic.BaseModel):
                     if isinstance(obj, of_type):
                         yield task["task_id"], obj
 
+    def get_run_data_interval(self, run: DagRun) -> DataInterval:
+        """Get the data interval of this run."""
+        if run.dag_id is not None and run.dag_id != self.dag_id:
+            raise ValueError(f"Arguments refer to different DAGs: 
{self.dag_id} != {run.dag_id}")
+
+        data_interval = _get_model_data_interval(run, "data_interval_start", 
"data_interval_end")
+        # the older implementation has call to infer_automated_data_interval 
if data_interval is None, do we want to keep that or raise
+        # an exception?
+        if data_interval is None:
+            raise ValueError(f"Cannot calculate data interval for run {run}")
+
+        return data_interval
+
     if TYPE_CHECKING:
         access_control: Mapping[str, Mapping[str, Collection[str]] | 
Collection[str]] | None = pydantic.Field(
             init=False, default=None
diff --git a/tests/dag_processing/test_collection.py 
b/tests/dag_processing/test_collection.py
index ca435cc1a4f..a248904cbef 100644
--- a/tests/dag_processing/test_collection.py
+++ b/tests/dag_processing/test_collection.py
@@ -40,7 +40,7 @@ from airflow.dag_processing.collection import (
 )
 from airflow.exceptions import SerializationError
 from airflow.listeners.listener import get_listener_manager
-from airflow.models import DagModel, Trigger
+from airflow.models import DagModel, DagRun, Trigger
 from airflow.models.asset import (
     AssetActive,
     asset_trigger_association_table,
@@ -449,6 +449,27 @@ class TestUpdateDagParsingResults:
                 {"owners": ["airflow"]},
                 id="default-owner",
             ),
+            pytest.param(
+                {
+                    "_tasks_": [
+                        EmptyOperator(task_id="task", owner="owner1"),
+                        EmptyOperator(task_id="task2", owner="owner2"),
+                        EmptyOperator(task_id="task3"),
+                        EmptyOperator(task_id="task4", owner="owner2"),
+                    ],
+                    "schedule": "0 0 * * *",
+                    "catchup": False,
+                },
+                {
+                    "default_view": conf.get("webserver", 
"dag_default_view").lower(),
+                    "owners": ["owner1", "owner2"],
+                    "next_dagrun": tz.datetime(2020, 1, 5, 0, 0, 0),
+                    "next_dagrun_data_interval_start": tz.datetime(2020, 1, 5, 
0, 0, 0),
+                    "next_dagrun_data_interval_end": tz.datetime(2020, 1, 6, 
0, 0, 0),
+                    "next_dagrun_create_after": tz.datetime(2020, 1, 6, 0, 0, 
0),
+                },
+                id="with-scheduled-dagruns",
+            ),
         ],
     )
     @pytest.mark.usefixtures("clean_db")
@@ -462,6 +483,16 @@ class TestUpdateDagParsingResults:
         if tasks:
             dag.add_tasks(tasks)
 
+        if attrs.pop("schedule", None):
+            dr_kwargs = {
+                "dag_id": "dag",
+                "run_type": "scheduled",
+                "data_interval": (dt, dt + timedelta(minutes=5)),
+            }
+            dr1 = DagRun(logical_date=dt, run_id="test_run_id_1", **dr_kwargs, 
start_date=dt)
+            session.add(dr1)
+            session.commit()
+
         update_dag_parsing_results_in_db([self.dag_to_lazy_serdag(dag)], {}, 
set(), session)
 
         orm_dag = session.get(DagModel, ("dag",))

Reply via email to