This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c01daf8119 Do not let EventsTimetable schedule past events if catchup=False (#36134) c01daf8119 is described below commit c01daf811925816e9ae09b78c37b9ff8d87ce691 Author: Aleksey Kirilishin <54231417+avkirilis...@users.noreply.github.com> AuthorDate: Wed Jan 10 19:29:02 2024 +0300 Do not let EventsTimetable schedule past events if catchup=False (#36134) * Fix the EventsTimetable schedules past events bug --------- Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> --- airflow/timetables/events.py | 41 +++++++++++++------- tests/timetables/test_events_timetable.py | 64 +++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/airflow/timetables/events.py b/airflow/timetables/events.py index 1998b12d46..c8fd65c2a9 100644 --- a/airflow/timetables/events.py +++ b/airflow/timetables/events.py @@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Iterable import pendulum from airflow.timetables.base import DagRunInfo, DataInterval, Timetable +from airflow.utils import timezone if TYPE_CHECKING: from pendulum import DateTime @@ -58,10 +59,13 @@ class EventsTimetable(Timetable): self.event_dates.sort() self.restrict_to_events = restrict_to_events if description is None: - self.description = ( - f"{len(self.event_dates)} Events between {self.event_dates[0]} and {self.event_dates[-1]}" - ) - self._summary = f"{len(self.event_dates)} Events" + if self.event_dates: + self.description = ( + f"{len(self.event_dates)} events between {self.event_dates[0]} and {self.event_dates[-1]}" + ) + else: + self.description = "No events" + self._summary = f"{len(self.event_dates)} events" else: self._summary = description self.description = description @@ -79,22 +83,31 @@ class EventsTimetable(Timetable): last_automated_data_interval: DataInterval | None, restriction: TimeRestriction, ) -> DagRunInfo | None: - if last_automated_data_interval is None: - next_event = self.event_dates[0] + earliest = restriction.earliest + if not restriction.catchup: + current_time = timezone.utcnow() + if earliest is None or current_time > earliest: + earliest = pendulum.instance(current_time) + + for next_event in self.event_dates: + if earliest and next_event < earliest: + continue + if last_automated_data_interval and next_event <= last_automated_data_interval.end: + continue + break else: - future_dates = itertools.dropwhile( - lambda when: when <= last_automated_data_interval.end, # type: ignore - self.event_dates, - ) - next_event = next(future_dates, None) # type: ignore - if next_event is None: - return None + # We need to return None if self.event_dates is empty or, + # if not empty, when no suitable event can be found. + return None + + if restriction.latest is not None and next_event > restriction.latest: + return None return DagRunInfo.exact(next_event) def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # If Timetable not restricted to events, run for the time specified - if not self.restrict_to_events: + if not self.restrict_to_events or not self.event_dates: return DataInterval.exact(run_after) # If restricted to events, run for the most recent past event diff --git a/tests/timetables/test_events_timetable.py b/tests/timetables/test_events_timetable.py index e743000f07..39d1fd3431 100644 --- a/tests/timetables/test_events_timetable.py +++ b/tests/timetables/test_events_timetable.py @@ -19,12 +19,14 @@ from __future__ import annotations import pendulum import pytest +import time_machine from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.events import EventsTimetable from airflow.utils.timezone import utc -START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events +BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events +START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc) EVENT_DATES = [ pendulum.DateTime(2021, 9, 6, tzinfo=utc), @@ -93,7 +95,7 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict Test that when using strict event dates, manual runs before the first event have the first event's date as the start interval """ - manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=START_DATE) + manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE) expected_data_interval = DataInterval.exact(EVENT_DATES[0]) assert expected_data_interval == manual_run_data_interval @@ -101,8 +103,15 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict @pytest.mark.parametrize( "last_automated_data_interval, expected_next_info", [ + pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)), + pytest.param( + DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]), + DagRunInfo.interval(START_DATE, START_DATE), + ), + ] + + [ pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2)) - for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:]) + for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:]) ] + [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], EVENT_DATES_SORTED[-1]), None)], ) @@ -118,3 +127,52 @@ def test_subsequent_weekday_schedule( restriction=restriction, ) assert next_info == expected_next_info + + +@pytest.mark.parametrize( + "current_date", + [ + pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc), id="when-current-date-is-before-first-event"), + pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc), id="when-current-date-is-in-the-middle"), + pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc), id="when-current-date-is-after-last-event"), + ], +) +@pytest.mark.parametrize( + "last_automated_data_interval", + [ + pytest.param(None, id="first-run"), + pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE), id="subsequent-run"), + ], +) +def test_no_catchup_first_starts( + last_automated_data_interval: DataInterval | None, + current_date, + unrestricted_timetable: Timetable, +) -> None: + # we don't use the last_automated_data_interval here because it's always less than the first event + expected_date = max(current_date, START_DATE, EVENT_DATES_SORTED[0]) + expected_info = None + if expected_date <= EVENT_DATES_SORTED[-1]: + expected_info = DagRunInfo.interval(start=expected_date, end=expected_date) + + with time_machine.travel(current_date): + next_info = unrestricted_timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False), + ) + assert next_info == expected_info + + +def test_empty_timetable() -> None: + empty_timetable = EventsTimetable(event_dates=[]) + next_info = empty_timetable.next_dagrun_info( + last_automated_data_interval=None, + restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False), + ) + assert next_info is None + + +def test_empty_timetable_manual_run() -> None: + empty_timetable = EventsTimetable(event_dates=[]) + manual_run_data_interval = empty_timetable.infer_manual_data_interval(run_after=START_DATE) + assert manual_run_data_interval == DataInterval(start=START_DATE, end=START_DATE)