This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a89514ec38 chore(core): stop deferring TimeDeltaSensorAsync task when 
the target_dttm is in the past (#40719)
a89514ec38 is described below

commit a89514ec38d368efa9733c8376953024c8da9f1a
Author: Hussein Awala <huss...@awala.fr>
AuthorDate: Fri Jul 12 15:29:31 2024 +0200

    chore(core): stop deferring TimeDeltaSensorAsync task when the target_dttm 
is in the past (#40719)
    
    * chore(core): stop deferring TimeDeltaSensorAsync task when the 
target_dttm is in the past
    
    * add a unit test
---
 airflow/sensors/time_delta.py    |  5 ++++-
 tests/sensors/test_time_delta.py | 28 +++++++++++++++++++++++++++-
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py
index 82d16bbae6..226d520aa0 100644
--- a/airflow/sensors/time_delta.py
+++ b/airflow/sensors/time_delta.py
@@ -66,9 +66,12 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
 
     """
 
-    def execute(self, context: Context) -> NoReturn:
+    def execute(self, context: Context) -> bool | NoReturn:
         target_dttm = context["data_interval_end"]
         target_dttm += self.delta
+        if timezone.utcnow() > target_dttm:
+            # If the target datetime is in the past, return immediately
+            return True
         try:
             trigger = DateTimeTrigger(moment=target_dttm)
         except (TypeError, ValueError) as e:
diff --git a/tests/sensors/test_time_delta.py b/tests/sensors/test_time_delta.py
index b19af4a8a8..159cefab9c 100644
--- a/tests/sensors/test_time_delta.py
+++ b/tests/sensors/test_time_delta.py
@@ -18,12 +18,14 @@
 from __future__ import annotations
 
 from datetime import timedelta
+from unittest import mock
 
+import pendulum
 import pytest
 
 from airflow.models import DagBag
 from airflow.models.dag import DAG
-from airflow.sensors.time_delta import TimeDeltaSensor
+from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
 from airflow.utils.timezone import datetime
 
 pytestmark = pytest.mark.db_test
@@ -32,6 +34,7 @@ pytestmark = pytest.mark.db_test
 DEFAULT_DATE = datetime(2015, 1, 1)
 DEV_NULL = "/dev/null"
 TEST_DAG_ID = "unit_tests"
+REFERENCE_TIME = pendulum.now("UTC").replace(microsecond=0, second=0, minute=0)
 
 
 class TestTimedeltaSensor:
@@ -43,3 +46,26 @@ class TestTimedeltaSensor:
     def test_timedelta_sensor(self):
         op = TimeDeltaSensor(task_id="timedelta_sensor_check", 
delta=timedelta(seconds=2), dag=self.dag)
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+
+class TestTimeDeltaSensorAsync:
+    def setup_method(self):
+        self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
+        self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
+        self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+
+    @pytest.mark.parametrize(
+        "data_interval_end, delta, should_deffer",
+        [
+            (REFERENCE_TIME.add(hours=-1), timedelta(hours=1), False),
+            (REFERENCE_TIME, timedelta(hours=1), True),
+        ],
+    )
+    @mock.patch("airflow.models.baseoperator.BaseOperator.defer")
+    def test_timedelta_sensor(self, defer_mock, data_interval_end, delta, 
should_deffer):
+        op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check", 
delta=delta, dag=self.dag)
+        op.execute({"data_interval_end": data_interval_end})
+        if should_deffer:
+            defer_mock.assert_called_once()
+        else:
+            defer_mock.assert_not_called()

Reply via email to