This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c01daf8119 Do not let EventsTimetable schedule past events if 
catchup=False (#36134)
c01daf8119 is described below

commit c01daf811925816e9ae09b78c37b9ff8d87ce691
Author: Aleksey Kirilishin <54231417+avkirilis...@users.noreply.github.com>
AuthorDate: Wed Jan 10 19:29:02 2024 +0300

    Do not let EventsTimetable schedule past events if catchup=False (#36134)
    
    * Fix the EventsTimetable schedules past events bug
    ---------
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
---
 airflow/timetables/events.py              | 41 +++++++++++++-------
 tests/timetables/test_events_timetable.py | 64 +++++++++++++++++++++++++++++--
 2 files changed, 88 insertions(+), 17 deletions(-)

diff --git a/airflow/timetables/events.py b/airflow/timetables/events.py
index 1998b12d46..c8fd65c2a9 100644
--- a/airflow/timetables/events.py
+++ b/airflow/timetables/events.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Iterable
 import pendulum
 
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
+from airflow.utils import timezone
 
 if TYPE_CHECKING:
     from pendulum import DateTime
@@ -58,10 +59,13 @@ class EventsTimetable(Timetable):
             self.event_dates.sort()
         self.restrict_to_events = restrict_to_events
         if description is None:
-            self.description = (
-                f"{len(self.event_dates)} Events between {self.event_dates[0]} 
and {self.event_dates[-1]}"
-            )
-            self._summary = f"{len(self.event_dates)} Events"
+            if self.event_dates:
+                self.description = (
+                    f"{len(self.event_dates)} events between 
{self.event_dates[0]} and {self.event_dates[-1]}"
+                )
+            else:
+                self.description = "No events"
+            self._summary = f"{len(self.event_dates)} events"
         else:
             self._summary = description
             self.description = description
@@ -79,22 +83,31 @@ class EventsTimetable(Timetable):
         last_automated_data_interval: DataInterval | None,
         restriction: TimeRestriction,
     ) -> DagRunInfo | None:
-        if last_automated_data_interval is None:
-            next_event = self.event_dates[0]
+        earliest = restriction.earliest
+        if not restriction.catchup:
+            current_time = timezone.utcnow()
+            if earliest is None or current_time > earliest:
+                earliest = pendulum.instance(current_time)
+
+        for next_event in self.event_dates:
+            if earliest and next_event < earliest:
+                continue
+            if last_automated_data_interval and next_event <= 
last_automated_data_interval.end:
+                continue
+            break
         else:
-            future_dates = itertools.dropwhile(
-                lambda when: when <= last_automated_data_interval.end,  # 
type: ignore
-                self.event_dates,
-            )
-            next_event = next(future_dates, None)  # type: ignore
-            if next_event is None:
-                return None
+            # We need to return None if self.event_dates is empty or,
+            # if not empty, when no suitable event can be found.
+            return None
+
+        if restriction.latest is not None and next_event > restriction.latest:
+            return None
 
         return DagRunInfo.exact(next_event)
 
     def infer_manual_data_interval(self, *, run_after: DateTime) -> 
DataInterval:
         # If Timetable not restricted to events, run for the time specified
-        if not self.restrict_to_events:
+        if not self.restrict_to_events or not self.event_dates:
             return DataInterval.exact(run_after)
 
         # If restricted to events, run for the most recent past event
diff --git a/tests/timetables/test_events_timetable.py 
b/tests/timetables/test_events_timetable.py
index e743000f07..39d1fd3431 100644
--- a/tests/timetables/test_events_timetable.py
+++ b/tests/timetables/test_events_timetable.py
@@ -19,12 +19,14 @@ from __future__ import annotations
 
 import pendulum
 import pytest
+import time_machine
 
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 from airflow.timetables.events import EventsTimetable
 from airflow.utils.timezone import utc
 
-START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc)  # Precedes all events
+BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc)  # Precedes all events
+START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc)
 
 EVENT_DATES = [
     pendulum.DateTime(2021, 9, 6, tzinfo=utc),
@@ -93,7 +95,7 @@ def test_manual_with_restricted_before(restricted_timetable: 
Timetable, restrict
     Test that when using strict event dates, manual runs before the first 
event have the first event's date
     as the start interval
     """
-    manual_run_data_interval = 
restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
+    manual_run_data_interval = 
restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE)
     expected_data_interval = DataInterval.exact(EVENT_DATES[0])
     assert expected_data_interval == manual_run_data_interval
 
@@ -101,8 +103,15 @@ def 
test_manual_with_restricted_before(restricted_timetable: Timetable, restrict
 @pytest.mark.parametrize(
     "last_automated_data_interval, expected_next_info",
     [
+        pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)),
+        pytest.param(
+            DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]),
+            DagRunInfo.interval(START_DATE, START_DATE),
+        ),
+    ]
+    + [
         pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2))
-        for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:])
+        for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:])
     ]
     + [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], 
EVENT_DATES_SORTED[-1]), None)],
 )
@@ -118,3 +127,52 @@ def test_subsequent_weekday_schedule(
         restriction=restriction,
     )
     assert next_info == expected_next_info
+
+
+@pytest.mark.parametrize(
+    "current_date",
+    [
+        pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc), 
id="when-current-date-is-before-first-event"),
+        pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc), 
id="when-current-date-is-in-the-middle"),
+        pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc), 
id="when-current-date-is-after-last-event"),
+    ],
+)
+@pytest.mark.parametrize(
+    "last_automated_data_interval",
+    [
+        pytest.param(None, id="first-run"),
+        pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE), 
id="subsequent-run"),
+    ],
+)
+def test_no_catchup_first_starts(
+    last_automated_data_interval: DataInterval | None,
+    current_date,
+    unrestricted_timetable: Timetable,
+) -> None:
+    # we don't use the last_automated_data_interval here because it's always 
less than the first event
+    expected_date = max(current_date, START_DATE, EVENT_DATES_SORTED[0])
+    expected_info = None
+    if expected_date <= EVENT_DATES_SORTED[-1]:
+        expected_info = DagRunInfo.interval(start=expected_date, 
end=expected_date)
+
+    with time_machine.travel(current_date):
+        next_info = unrestricted_timetable.next_dagrun_info(
+            last_automated_data_interval=last_automated_data_interval,
+            restriction=TimeRestriction(earliest=START_DATE, latest=None, 
catchup=False),
+        )
+    assert next_info == expected_info
+
+
+def test_empty_timetable() -> None:
+    empty_timetable = EventsTimetable(event_dates=[])
+    next_info = empty_timetable.next_dagrun_info(
+        last_automated_data_interval=None,
+        restriction=TimeRestriction(earliest=START_DATE, latest=None, 
catchup=False),
+    )
+    assert next_info is None
+
+
+def test_empty_timetable_manual_run() -> None:
+    empty_timetable = EventsTimetable(event_dates=[])
+    manual_run_data_interval = 
empty_timetable.infer_manual_data_interval(run_after=START_DATE)
+    assert manual_run_data_interval == DataInterval(start=START_DATE, 
end=START_DATE)

Reply via email to