This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new a89514ec38 chore(core): stop deferring TimeDeltaSensorAsync task when the target_dttm is in the past (#40719) a89514ec38 is described below commit a89514ec38d368efa9733c8376953024c8da9f1a Author: Hussein Awala <huss...@awala.fr> AuthorDate: Fri Jul 12 15:29:31 2024 +0200 chore(core): stop deferring TimeDeltaSensorAsync task when the target_dttm is in the past (#40719) * chore(core): stop deferring TimeDeltaSensorAsync task when the target_dttm is in the past * add a unit test --- airflow/sensors/time_delta.py | 5 ++++- tests/sensors/test_time_delta.py | 28 +++++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 82d16bbae6..226d520aa0 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -66,9 +66,12 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): """ - def execute(self, context: Context) -> NoReturn: + def execute(self, context: Context) -> bool | NoReturn: target_dttm = context["data_interval_end"] target_dttm += self.delta + if timezone.utcnow() > target_dttm: + # If the target datetime is in the past, return immediately + return True try: trigger = DateTimeTrigger(moment=target_dttm) except (TypeError, ValueError) as e: diff --git a/tests/sensors/test_time_delta.py b/tests/sensors/test_time_delta.py index b19af4a8a8..159cefab9c 100644 --- a/tests/sensors/test_time_delta.py +++ b/tests/sensors/test_time_delta.py @@ -18,12 +18,14 @@ from __future__ import annotations from datetime import timedelta +from unittest import mock +import pendulum import pytest from airflow.models import DagBag from airflow.models.dag import DAG -from airflow.sensors.time_delta import TimeDeltaSensor +from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync from airflow.utils.timezone import datetime pytestmark = pytest.mark.db_test @@ -32,6 +34,7 @@ pytestmark = pytest.mark.db_test DEFAULT_DATE = datetime(2015, 1, 1) DEV_NULL = "/dev/null" TEST_DAG_ID = "unit_tests" +REFERENCE_TIME = pendulum.now("UTC").replace(microsecond=0, second=0, minute=0) class TestTimedeltaSensor: @@ -43,3 +46,26 @@ class TestTimedeltaSensor: def test_timedelta_sensor(self): op = TimeDeltaSensor(task_id="timedelta_sensor_check", delta=timedelta(seconds=2), dag=self.dag) op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + +class TestTimeDeltaSensorAsync: + def setup_method(self): + self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True) + self.args = {"owner": "airflow", "start_date": DEFAULT_DATE} + self.dag = DAG(TEST_DAG_ID, default_args=self.args) + + @pytest.mark.parametrize( + "data_interval_end, delta, should_deffer", + [ + (REFERENCE_TIME.add(hours=-1), timedelta(hours=1), False), + (REFERENCE_TIME, timedelta(hours=1), True), + ], + ) + @mock.patch("airflow.models.baseoperator.BaseOperator.defer") + def test_timedelta_sensor(self, defer_mock, data_interval_end, delta, should_deffer): + op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check", delta=delta, dag=self.dag) + op.execute({"data_interval_end": data_interval_end}) + if should_deffer: + defer_mock.assert_called_once() + else: + defer_mock.assert_not_called()