This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dd900b6301 Set logical_date and data_interval to None for 
asset-triggered dags and forbid them to be accessed in context/template (#46460)
8dd900b6301 is described below

commit 8dd900b63014b5e80f42660e090577ad8efc8a83
Author: Wei Lee <[email protected]>
AuthorDate: Thu Feb 13 15:42:00 2025 +0800

    Set logical_date and data_interval to None for asset-triggered dags and 
forbid them to be accessed in context/template (#46460)
    
    * style(dag): improve type annotation
    
    * refactor(dag): rename by_dag as adrq_by_dag
    
    * feat(scheduler_job_runner): set logical_date, data_interval as none when 
creating dag runs for asset triggered dag
    
    * test(pytest_plugin): set run_after to now if data_interval is None
    
    * test(scheduler_job_runner): set logical_date and data_interval of asset 
triggered dag runs to none
    
    * feat(dagrun): order queued and running dag runs by run_after instead of 
logical_date
    
    * feat(dag): get task_instances based on run_after instead of logical_date
    
    * feat(taskinstance): change log_url base_date to use run_after instead of 
logical_date
    
    * test(test_common): rewrite create_dagrun as logical_date is now nullable
    
    * feat(dag): get the last_dagrun by run_after
    
    * feat(taskinstance): pass base_date to TaskInstance.log_uri only when 
logical_date exists
    
    * feat(www): fix last_dag_run through run_after
    
    * feat(www): fetch last_dag_runs using run_after instead of logical_date in 
task_stat
    
    * feat(www): fetch dag_run through run_after instead of logical_date in 
grid_data
    
    * feat(task_sdk): remove data_interval_start, data_interval_end, 
prev_data_interval_start_success, prev_data_interval_end_success for dag_run 
that has no data_interval
    
    * test(pytest_plugin): add DagRun.DATASET_TRIGERED for backward compat
    
    * Revert "test(test_common): rewrite create_dagrun as logical_date is now 
nullable"
    
    This reverts commit a3ba5a1021d203f324dab1371d85ad11a7c55e7a.
    
    * Revert "feat(dag): get task_instances based on run_after instead of 
logical_date"
    
    This reverts commit a8be4a7ebb424b194c8dd3183243b2516e4edb66.
    
    * Revert "feat(dag): get the last_dagrun by run_after"
    
    This reverts commit a1b4e6e8299b3e0e800db088bc95b30519fe6b1a.
    
    * feat(timetable): remove AssetTriggeredTimetable.data_interval_for_events
    
    * feat(scheduler_job_runner): use start_date directly for asset triggered 
dag
    
    * Revert "feat(www): fetch dag_run through run_after instead of 
logical_date in grid_data"
    
    This reverts commit 24a79d6d78820b5ed308b09ec9a65870e9c88384.
    
    * Revert "feat(www): fetch last_dag_runs using run_after instead of 
logical_date in task_stat"
    
    This reverts commit 0f04880ece0a7ac42b97597bfa6a6fbe3375cb3b.
    
    * Revert "feat(www): fix last_dag_run through run_after"
    
    This reverts commit 5c90113b68fdf47a8bfe232e22098b4eaa0e7652.
    
    * refactor(task_runner): merge the data_interval keys with logical_date 
check logic
    
    * feat(scheduler_job_runner): simplify _create_dag_runs_asset_triggered 
logic as we don't need to check existing logical_date for asset triggered dag 
runs
---
 airflow/jobs/scheduler_job_runner.py               | 132 +++++++++------------
 airflow/models/dag.py                              |  18 +--
 airflow/models/dagrun.py                           |   4 +-
 airflow/timetables/simple.py                       |  24 +---
 scripts/ci/pre_commit/template_context_key_sync.py |  13 +-
 .../src/airflow/sdk/execution_time/task_runner.py  |  20 ++--
 tests/jobs/test_scheduler_job.py                   |  14 +--
 tests/timetables/test_assets_timetable.py          |  18 ---
 tests_common/pytest_plugin.py                      |   4 +-
 9 files changed, 104 insertions(+), 143 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 3703268b07e..4d81e464e68 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -84,6 +84,7 @@ if TYPE_CHECKING:
     from datetime import datetime
     from types import FrameType
 
+    from pendulum.datetime import DateTime
     from sqlalchemy.orm import Query, Session
 
     from airflow.executors.executor_utils import ExecutorName
@@ -502,8 +503,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
                         if current_task_concurrency >= task_concurrency_limit:
                             self.log.info(
-                                "Not executing %s since the task concurrency 
for"
-                                " this task has been reached.",
+                                "Not executing %s since the task concurrency 
for this task has been reached.",
                                 task_instance,
                             )
                             starved_tasks.add((task_instance.dag_id, 
task_instance.task_id))
@@ -1206,7 +1206,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         non_asset_dags = 
all_dags_needing_dag_runs.difference(asset_triggered_dags)
         self._create_dag_runs(non_asset_dags, session)
         if asset_triggered_dags:
-            self._create_dag_runs_asset_triggered(asset_triggered_dags, 
asset_triggered_dag_info, session)
+            self._create_dag_runs_asset_triggered(
+                dag_models=asset_triggered_dags,
+                asset_triggered_dag_info=asset_triggered_dag_info,
+                session=session,
+            )
 
         # commit the session - Release the write lock on DagModel table.
         guard.commit()
@@ -1325,21 +1329,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         session: Session,
     ) -> None:
         """For DAGs that are triggered by assets, create dag runs."""
-        # Bulk Fetch DagRuns with dag_id and logical_date same
-        # as DagModel.dag_id and DagModel.next_dagrun
-        # This list is used to verify if the DagRun already exist so that we 
don't attempt to create
-        # duplicate dag runs
-        logical_dates = {
-            dag_id: timezone.coerce_datetime(last_time)
-            for dag_id, (_, last_time) in asset_triggered_dag_info.items()
+        triggered_dates: dict[str, DateTime] = {
+            dag_id: timezone.coerce_datetime(last_asset_event_time)
+            for dag_id, (_, last_asset_event_time) in 
asset_triggered_dag_info.items()
         }
-        existing_dagruns: set[tuple[str, timezone.DateTime]] = set(
-            session.execute(
-                select(DagRun.dag_id, DagRun.logical_date).where(
-                    tuple_(DagRun.dag_id, 
DagRun.logical_date).in_(logical_dates.items())
-                )
-            )
-        )
 
         for dag_model in dag_models:
             dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
@@ -1356,64 +1349,50 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
             latest_dag_version = DagVersion.get_latest_version(dag.dag_id, 
session=session)
 
-            # Explicitly check if the DagRun already exists. This is an edge 
case
-            # where a Dag Run is created but `DagModel.next_dagrun` and 
`DagModel.next_dagrun_create_after`
-            # are not updated.
-            # We opted to check DagRun existence instead
-            # of catching an Integrity error and rolling back the session i.e
-            # we need to set dag.next_dagrun_info if the Dag Run already 
exists or if we
-            # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
-            # instead of falling in a loop of Integrity Error.
-            logical_date = logical_dates[dag.dag_id]
-            if (dag.dag_id, logical_date) not in existing_dagruns:
-                previous_dag_run = session.scalar(
-                    select(DagRun)
-                    .where(
-                        DagRun.dag_id == dag.dag_id,
-                        DagRun.logical_date < logical_date,
-                        DagRun.run_type == DagRunType.ASSET_TRIGGERED,
-                    )
-                    .order_by(DagRun.logical_date.desc())
-                    .limit(1)
-                )
-                asset_event_filters = [
-                    DagScheduleAssetReference.dag_id == dag.dag_id,
-                    AssetEvent.timestamp <= logical_date,
-                ]
-                if previous_dag_run:
-                    asset_event_filters.append(AssetEvent.timestamp > 
previous_dag_run.logical_date)
-
-                asset_events = session.scalars(
-                    select(AssetEvent)
-                    .join(
-                        DagScheduleAssetReference,
-                        AssetEvent.asset_id == 
DagScheduleAssetReference.asset_id,
-                    )
-                    .where(*asset_event_filters)
-                ).all()
-
-                data_interval = 
dag.timetable.data_interval_for_events(logical_date, asset_events)
-                dag_run = dag.create_dagrun(
-                    run_id=DagRun.generate_run_id(
-                        run_type=DagRunType.ASSET_TRIGGERED,
-                        logical_date=logical_date,
-                        run_after=max(logical_dates.values()),
-                    ),
-                    logical_date=logical_date,
-                    data_interval=data_interval,
-                    run_after=max(logical_dates.values()),
-                    run_type=DagRunType.ASSET_TRIGGERED,
-                    triggered_by=DagRunTriggeredByType.ASSET,
-                    dag_version=latest_dag_version,
-                    state=DagRunState.QUEUED,
-                    creating_job_id=self.job.id,
-                    session=session,
+            triggered_date = triggered_dates[dag.dag_id]
+            previous_dag_run = session.scalar(
+                select(DagRun)
+                .where(
+                    DagRun.dag_id == dag.dag_id,
+                    DagRun.run_after < triggered_date,
+                    DagRun.run_type == DagRunType.ASSET_TRIGGERED,
                 )
-                Stats.incr("asset.triggered_dagruns")
-                dag_run.consumed_asset_events.extend(asset_events)
-                session.execute(
-                    
delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id)
+                .order_by(DagRun.run_after.desc())
+                .limit(1)
+            )
+            asset_event_filters = [
+                DagScheduleAssetReference.dag_id == dag.dag_id,
+                AssetEvent.timestamp <= triggered_date,
+            ]
+            if previous_dag_run:
+                asset_event_filters.append(AssetEvent.timestamp > 
previous_dag_run.run_after)
+
+            asset_events = session.scalars(
+                select(AssetEvent)
+                .join(
+                    DagScheduleAssetReference,
+                    AssetEvent.asset_id == DagScheduleAssetReference.asset_id,
                 )
+                .where(*asset_event_filters)
+            ).all()
+
+            dag_run = dag.create_dagrun(
+                run_id=DagRun.generate_run_id(
+                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=triggered_date
+                ),
+                logical_date=None,
+                data_interval=None,
+                run_after=triggered_date,
+                run_type=DagRunType.ASSET_TRIGGERED,
+                triggered_by=DagRunTriggeredByType.ASSET,
+                dag_version=latest_dag_version,
+                state=DagRunState.QUEUED,
+                creating_job_id=self.job.id,
+                session=session,
+            )
+            Stats.incr("asset.triggered_dagruns")
+            dag_run.consumed_asset_events.extend(asset_events)
+            
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_run.dag_id))
 
     def _should_update_dag_next_dagruns(
         self,
@@ -1486,7 +1465,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
             dag_run.state = DagRunState.RUNNING
             dag_run.start_date = timezone.utcnow()
-            if dag.timetable.periodic and not dag_run.external_trigger and 
dag_run.clear_number < 1:
+            if (
+                dag.timetable.periodic
+                and dag_run.triggered_by != DagRunTriggeredByType.ASSET
+                and not dag_run.external_trigger
+                and dag_run.clear_number < 1
+            ):
                 # TODO: Logically, this should be DagRunInfo.run_after, but the
                 # information is not stored on a DagRun, only before the actual
                 # execution on DagModel.next_dagrun_create_after. We should add
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 8ae5935bad3..5ebe1c2cb2f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -304,8 +304,7 @@ def _convert_max_consecutive_failed_dag_runs(val: int) -> 
int:
         val = airflow_conf.getint("core", 
"max_consecutive_failed_dag_runs_per_dag")
     if val < 0:
         raise ValueError(
-            f"Invalid max_consecutive_failed_dag_runs: {val}."
-            f"Requires max_consecutive_failed_dag_runs >= 0"
+            f"Invalid max_consecutive_failed_dag_runs: {val}. Requires 
max_consecutive_failed_dag_runs >= 0"
         )
     return val
 
@@ -2336,11 +2335,11 @@ class DagModel(Base):
                 return None
 
         # this loads all the ADRQ records.... may need to limit num dags
-        by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list)
+        adrq_by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list)
         for r in session.scalars(select(AssetDagRunQueue)):
-            by_dag[r.target_dag_id].append(r)
+            adrq_by_dag[r.target_dag_id].append(r)
         dag_statuses: dict[str, dict[AssetUniqueKey, bool]] = {}
-        for dag_id, records in by_dag.items():
+        for dag_id, records in adrq_by_dag.items():
             dag_statuses[dag_id] = {AssetUniqueKey.from_asset(x.asset): True 
for x in records}
         ser_dags = 
SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses), 
session=session)
 
@@ -2348,14 +2347,15 @@ class DagModel(Base):
             dag_id = ser_dag.dag_id
             statuses = dag_statuses[dag_id]
             if not dag_ready(dag_id, 
cond=ser_dag.dag.timetable.asset_condition, statuses=statuses):
-                del by_dag[dag_id]
+                del adrq_by_dag[dag_id]
                 del dag_statuses[dag_id]
         del dag_statuses
-        asset_triggered_dag_info = {}
-        for dag_id, records in by_dag.items():
+        # TODO: make it more readable (rename it or make it attrs, dataclass 
or etc.)
+        asset_triggered_dag_info: dict[str, tuple[datetime, datetime]] = {}
+        for dag_id, records in adrq_by_dag.items():
             times = sorted(x.created_at for x in records)
             asset_triggered_dag_info[dag_id] = (times[0], times[-1])
-        del by_dag
+        del adrq_by_dag
         asset_triggered_dag_ids = set(asset_triggered_dag_info.keys())
         if asset_triggered_dag_ids:
             exclusion_list = set(
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 808543d0b50..0035a68b502 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -450,7 +450,7 @@ class DagRun(Base, LoggingMixin):
             .order_by(
                 nulls_first(BackfillDagRun.sort_ordinal, session=session),
                 nulls_first(cls.last_scheduling_decision, session=session),
-                cls.logical_date,
+                cls.run_after,
             )
             .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
         )
@@ -535,7 +535,7 @@ class DagRun(Base, LoggingMixin):
                 nulls_first(BackfillDagRun.sort_ordinal, session=session),
                 nulls_first(cls.last_scheduling_decision, session=session),
                 nulls_first(running_drs.c.num_running, session=session),  # 
many running -> lower priority
-                cls.logical_date,
+                cls.run_after,
             )
             .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
         )
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 45574daa37e..b221b11b515 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from collections.abc import Collection, Sequence
+from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
@@ -25,7 +25,6 @@ from airflow.utils import timezone
 if TYPE_CHECKING:
     from pendulum import DateTime
 
-    from airflow.models.asset import AssetEvent
     from airflow.sdk.definitions.asset import BaseAsset
     from airflow.timetables.base import TimeRestriction
     from airflow.utils.types import DagRunType
@@ -202,27 +201,6 @@ class AssetTriggeredTimetable(_TrivialTimetable):
 
         return DagRun.generate_run_id(run_type=run_type, 
logical_date=logical_date, run_after=run_after)
 
-    def data_interval_for_events(
-        self,
-        logical_date: DateTime,
-        events: Collection[AssetEvent],
-    ) -> DataInterval:
-        if not events:
-            return DataInterval(logical_date, logical_date)
-
-        start_dates, end_dates = [], []
-        for event in events:
-            if event.source_dag_run is not None:
-                start_dates.append(event.source_dag_run.data_interval_start)
-                end_dates.append(event.source_dag_run.data_interval_end)
-            else:
-                start_dates.append(event.timestamp)
-                end_dates.append(event.timestamp)
-
-        start = min(start_dates)
-        end = max(end_dates)
-        return DataInterval(start, end)
-
     def next_dagrun_info(
         self,
         *,
diff --git a/scripts/ci/pre_commit/template_context_key_sync.py 
b/scripts/ci/pre_commit/template_context_key_sync.py
index 49d768541a1..270a3a695b2 100755
--- a/scripts/ci/pre_commit/template_context_key_sync.py
+++ b/scripts/ci/pre_commit/template_context_key_sync.py
@@ -33,7 +33,18 @@ CONTEXT_HINT = ROOT_DIR.joinpath("task_sdk", "src", 
"airflow", "sdk", "definitio
 TEMPLATES_REF_RST = ROOT_DIR.joinpath("docs", "apache-airflow", 
"templates-ref.rst")
 
 # These are only conditionally set
-IGNORE = {"ds", "ds_nodash", "ts", "ts_nodash", "ts_nodash_with_tz", 
"logical_date"}
+IGNORE = {
+    "ds",
+    "ds_nodash",
+    "ts",
+    "ts_nodash",
+    "ts_nodash_with_tz",
+    "logical_date",
+    "data_interval_end",
+    "data_interval_start",
+    "prev_data_interval_start_success",
+    "prev_data_interval_end_success",
+}
 
 
 def _iter_template_context_keys_from_original_return() -> typing.Iterator[str]:
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index a100452791b..3b8801ff9fc 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -155,16 +155,8 @@ class RuntimeTaskInstance(TaskInstance):
             context_from_server: Context = {
                 # TODO: Assess if we need to pass these through 
timezone.coerce_datetime
                 "dag_run": dag_run,  # type: ignore[typeddict-item]  # 
Removable after #46522
-                "data_interval_end": dag_run.data_interval_end,
-                "data_interval_start": dag_run.data_interval_start,
                 "task_instance_key_str": 
f"{self.task.dag_id}__{self.task.task_id}__{dag_run.run_id}",
                 "task_reschedule_count": 
self._ti_context_from_server.task_reschedule_count,
-                "prev_data_interval_start_success": lazy_object_proxy.Proxy(
-                    lambda: 
get_previous_dagrun_success(self.id).data_interval_start
-                ),
-                "prev_data_interval_end_success": lazy_object_proxy.Proxy(
-                    lambda: 
get_previous_dagrun_success(self.id).data_interval_end
-                ),
                 "prev_start_date_success": lazy_object_proxy.Proxy(
                     lambda: get_previous_dagrun_success(self.id).start_date
                 ),
@@ -180,8 +172,10 @@ class RuntimeTaskInstance(TaskInstance):
                 ts = logical_date.isoformat()
                 ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
                 ts_nodash_with_tz = ts.replace("-", "").replace(":", "")
+                # logical_date and data_interval either coexist or be None 
together
                 context.update(
                     {
+                        # keys that depend on logical_date
                         "logical_date": logical_date,
                         "ds": ds,
                         "ds_nodash": ds_nodash,
@@ -189,8 +183,18 @@ class RuntimeTaskInstance(TaskInstance):
                         "ts": ts,
                         "ts_nodash": ts_nodash,
                         "ts_nodash_with_tz": ts_nodash_with_tz,
+                        # keys that depend on data_interval
+                        "data_interval_end": dag_run.data_interval_end,
+                        "data_interval_start": dag_run.data_interval_start,
+                        "prev_data_interval_start_success": 
lazy_object_proxy.Proxy(
+                            lambda: 
get_previous_dagrun_success(self.id).data_interval_start
+                        ),
+                        "prev_data_interval_end_success": 
lazy_object_proxy.Proxy(
+                            lambda: 
get_previous_dagrun_success(self.id).data_interval_end
+                        ),
                     }
                 )
+
             if from_server.upstream_map_indexes is not None:
                 # We stash this in here for later use, but we purposefully 
don't want to document it's
                 # existence. Should this be a private attribute on RuntimeTI 
instead perhaps?
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 76b1ceb0e9a..24aeb44c5a7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3880,8 +3880,8 @@ class TestSchedulerJob:
         assert list(map(dict_from_obj, created_run.consumed_asset_events)) == 
list(
             map(dict_from_obj, [event1, event2])
         )
-        assert created_run.data_interval_start == DEFAULT_DATE + 
timedelta(days=5)
-        assert created_run.data_interval_end == DEFAULT_DATE + 
timedelta(days=11)
+        assert created_run.data_interval_start is None
+        assert created_run.data_interval_end is None
         # dag2 ADRQ record should still be there since the dag run was *not* 
triggered
         assert 
session.query(AssetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one() is 
not None
         # dag2 should not be triggered since it depends on both asset 1  and 2
@@ -3889,7 +3889,7 @@ class TestSchedulerJob:
         # dag3 ADRQ record should be deleted since the dag run was triggered
         assert 
session.query(AssetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one_or_none()
 is None
 
-        assert dag3.get_last_dagrun().creating_job_id == scheduler_job.id
+        assert created_run.creating_job_id == scheduler_job.id
 
     @pytest.mark.need_serialized_dag
     @pytest.mark.parametrize(
@@ -4997,7 +4997,7 @@ class TestSchedulerJob:
         dag_version = DagVersion.get_latest_version(dag.dag_id)
         for i in range(16):
             dr = dag_maker.create_dagrun(
-                run_id=f"dr2_run_{i+1}",
+                run_id=f"dr2_run_{i + 1}",
                 state=State.RUNNING,
                 logical_date=date,
                 dag_version=dag_version,
@@ -5007,7 +5007,7 @@ class TestSchedulerJob:
         date = dr16[0].logical_date + timedelta(hours=1)
         for i in range(16, 32):
             dr = dag_maker.create_dagrun(
-                run_id=f"dr2_run_{i+1}",
+                run_id=f"dr2_run_{i + 1}",
                 state=State.QUEUED,
                 logical_date=date,
                 dag_version=dag_version,
@@ -5021,7 +5021,7 @@ class TestSchedulerJob:
         dag_version = DagVersion.get_latest_version(dag.dag_id)
         for i in range(16):
             dr = dag_maker.create_dagrun(
-                run_id=f"dr3_run_{i+1}",
+                run_id=f"dr3_run_{i + 1}",
                 state=State.RUNNING,
                 logical_date=date,
                 dag_version=dag_version,
@@ -5031,7 +5031,7 @@ class TestSchedulerJob:
         date = dr16[0].logical_date + timedelta(hours=1)
         for i in range(16, 32):
             dr = dag_maker.create_dagrun(
-                run_id=f"dr2_run_{i+1}",
+                run_id=f"dr2_run_{i + 1}",
                 state=State.QUEUED,
                 logical_date=date,
                 dag_version=dag_version,
diff --git a/tests/timetables/test_assets_timetable.py 
b/tests/timetables/test_assets_timetable.py
index 0158ddfd5cf..9892b5805bd 100644
--- a/tests/timetables/test_assets_timetable.py
+++ b/tests/timetables/test_assets_timetable.py
@@ -244,24 +244,6 @@ def asset_events(mocker) -> list[AssetEvent]:
     return [event_earlier, event_later]
 
 
-def test_data_interval_for_events(
-    asset_timetable: AssetOrTimeSchedule, asset_events: list[AssetEvent]
-) -> None:
-    """
-    Tests the data_interval_for_events method of AssetOrTimeSchedule.
-
-    :param asset_timetable: The AssetOrTimeSchedule instance to test.
-    :param asset_events: A list of mock AssetEvent instances.
-    """
-    data_interval = 
asset_timetable.data_interval_for_events(logical_date=DateTime.now(), 
events=asset_events)
-    assert data_interval.start == min(
-        event.timestamp for event in asset_events
-    ), "Data interval start does not match"
-    assert data_interval.end == max(
-        event.timestamp for event in asset_events
-    ), "Data interval end does not match"
-
-
 def test_run_ordering_inheritance(asset_timetable: AssetOrTimeSchedule) -> 
None:
     """
     Tests that AssetOrTimeSchedule inherits run_ordering from its parent class 
correctly.
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 76ddb082283..0766e33b183 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -893,6 +893,8 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
 
             if AIRFLOW_V_3_0_PLUS:
                 from airflow.utils.types import DagRunTriggeredByType
+            else:
+                DagRunType.ASSET_TRIGGERED = DagRunType.DATASET_TRIGGERED
 
             if "execution_date" in kwargs:
                 raise TypeError("use logical_date instead")
@@ -947,7 +949,7 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
                 kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST)
                 kwargs["logical_date"] = logical_date
                 kwargs.setdefault("dag_version", None)
-                kwargs.setdefault("run_after", data_interval[-1])
+                kwargs.setdefault("run_after", data_interval[-1] if 
data_interval else timezone.utcnow())
             else:
                 kwargs.pop("dag_version", None)
                 kwargs.pop("triggered_by", None)

Reply via email to