This is an automated email from the ASF dual-hosted git repository.

pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a1671f1f7d Retrieve dataset event created through RESTful API when 
creating dag run (#38332)
a1671f1f7d is described below

commit a1671f1f7d4c3d74de3121c0f8aa18cef8823464
Author: Wei Lee <weilee...@gmail.com>
AuthorDate: Thu Mar 21 23:44:22 2024 +0800

    Retrieve dataset event created through RESTful API when creating dag run 
(#38332)
    
    Fetch dataset events generated via the RESTful API during the
    creation of DAG runs. Currently, dataset events produced via
    the RESTful API are overlooked because there is no
    'source_dag_run' attribute. Furthermore, take into account the
    timestamps from dataset events created through the RESTful API
    when calculating data intervals.
---
 airflow/jobs/scheduler_job_runner.py |  1 -
 airflow/timetables/simple.py         | 18 +++++++++++-------
 2 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index c62ebc53b5..0596e7f59f 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1270,7 +1270,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         DagScheduleDatasetReference,
                         DatasetEvent.dataset_id == 
DagScheduleDatasetReference.dataset_id,
                     )
-                    .join(DatasetEvent.source_dag_run)
                     .where(*dataset_event_filters)
                 ).all()
 
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index dea3ca2d6a..6452244262 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import operator
 from typing import TYPE_CHECKING, Any, Collection, Sequence
 
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
@@ -183,12 +182,17 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
         if not events:
             return DataInterval(logical_date, logical_date)
 
-        start = min(
-            events, 
key=operator.attrgetter("source_dag_run.data_interval_start")
-        ).source_dag_run.data_interval_start
-        end = max(
-            events, key=operator.attrgetter("source_dag_run.data_interval_end")
-        ).source_dag_run.data_interval_end
+        start_dates, end_dates = [], []
+        for event in events:
+            if event.source_dag_run is not None:
+                start_dates.append(event.source_dag_run.data_interval_start)
+                end_dates.append(event.source_dag_run.data_interval_end)
+            else:
+                start_dates.append(event.timestamp)
+                end_dates.append(event.timestamp)
+
+        start = min(start_dates)
+        end = max(end_dates)
         return DataInterval(start, end)
 
     def next_dagrun_info(

Reply via email to