This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a898a1bce88 feat: merge TimeDeltaSensorAsync to TimeDeltaSensor
(#51133)
a898a1bce88 is described below
commit a898a1bce88c1566cc0e7a366b420a8b7b8f68fe
Author: James Hyphen <[email protected]>
AuthorDate: Thu May 29 12:06:26 2025 +0200
feat: merge TimeDeltaSensorAsync to TimeDeltaSensor (#51133)
* feat: merge TimeDeltaSensorAsync to TimeDeltaSensor
* feat: add a test for deferrable TimeDeltaSensor
* feat: add example, simplify code in TimeDeltaSensor (as per TimeSensor)
* feat: PR comments & adapt failing tests due to deprecation
* feat: get rid of airflow_venv/ from gitignore
* feat: remove unused variables
* Update
providers/standard/src/airflow/providers/standard/sensors/time_delta.py
Co-authored-by: Wei Lee <[email protected]>
* feat: resolve test errors
---------
Co-authored-by: Wei Lee <[email protected]>
---
.../openlineage/example_openlineage_defer.py | 27 ++--
.../providers/standard/sensors/time_delta.py | 64 +++++++---
.../tests/system/standard/example_sensors.py | 6 +-
.../tests/unit/standard/sensors/test_time_delta.py | 137 +++++++++++++++------
4 files changed, 161 insertions(+), 73 deletions(-)
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py
b/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py
index f8589ad6310..69b021cf43f 100644
---
a/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py
+++
b/providers/openlineage/tests/system/openlineage/example_openlineage_defer.py
@@ -21,9 +21,11 @@ from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import Variable
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensorAsync
+import pytest
from system.openlineage.operator import OpenLineageTestOperator
@@ -45,21 +47,22 @@ with DAG(
schedule=None,
catchup=False,
) as dag:
- # Timedelta is compared to the DAGRun start timestamp, which can occur
long before a worker picks up the
- # task. We need to ensure the sensor gets deferred at least once, so
setting 180s.
- wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=180))
+ with pytest.warns(AirflowProviderDeprecationWarning):
+ # Timedelta is compared to the DAGRun start timestamp, which can occur
long before a worker picks up the
+ # task. We need to ensure the sensor gets deferred at least once, so
setting 180s.
+ wait = TimeDeltaSensorAsync(task_id="wait",
delta=timedelta(seconds=180))
- check_start_events_amount = PythonOperator(
- task_id="check_start_events_amount",
python_callable=check_start_amount_func
- )
+ check_start_events_amount = PythonOperator(
+ task_id="check_start_events_amount",
python_callable=check_start_amount_func
+ )
- check_events = OpenLineageTestOperator(
- task_id="check_events",
- file_path=str(Path(__file__).parent /
"example_openlineage_defer.json"),
- allow_duplicate_events=True,
- )
+ check_events = OpenLineageTestOperator(
+ task_id="check_events",
+ file_path=str(Path(__file__).parent /
"example_openlineage_defer.json"),
+ allow_duplicate_events=True,
+ )
- wait >> check_start_events_amount >> check_events
+ wait >> check_start_events_amount >> check_events
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py
b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py
index 35d5c83e575..40ad809343b 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/time_delta.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/time_delta.py
@@ -17,14 +17,16 @@
# under the License.
from __future__ import annotations
+import warnings
from datetime import datetime, timedelta
from time import sleep
from typing import TYPE_CHECKING, Any, NoReturn
+from deprecated.classic import deprecated
from packaging.version import Version
from airflow.configuration import conf
-from airflow.exceptions import AirflowSkipException
+from airflow.exceptions import AirflowProviderDeprecationWarning,
AirflowSkipException
from airflow.providers.standard.triggers.temporal import DateTimeTrigger,
TimeDeltaTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
@@ -52,6 +54,7 @@ class TimeDeltaSensor(BaseSensorOperator):
otherwise run_after will be used.
:param delta: time to wait before succeeding.
+ :param deferrable: Run sensor in deferrable mode. If set to True, task
will defer itself to avoid taking up a worker slot while it is waiting.
.. seealso::
For more information on how to use this sensor, take a look at the
guide:
@@ -59,9 +62,18 @@ class TimeDeltaSensor(BaseSensorOperator):
"""
- def __init__(self, *, delta, **kwargs):
+ def __init__(
+ self,
+ *,
+ delta: timedelta,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ end_from_trigger: bool = False,
+ **kwargs,
+ ):
super().__init__(**kwargs)
self.delta = delta
+ self.deferrable = deferrable
+ self.end_from_trigger = end_from_trigger
def _derive_base_time(self, context: Context) -> datetime:
"""
@@ -90,27 +102,21 @@ class TimeDeltaSensor(BaseSensorOperator):
self.log.info("Checking if the delta has elapsed base_time=%s,
delta=%s", base_time, self.delta)
return timezone.utcnow() > target_dttm
-
-class TimeDeltaSensorAsync(TimeDeltaSensor):
"""
- A deferrable drop-in replacement for TimeDeltaSensor.
-
- Will defers itself to avoid taking up a worker slot while it is waiting.
-
- :param delta: time length to wait after the data interval before
succeeding.
- :param end_from_trigger: End the task directly from the triggerer without
going into the worker.
-
- .. seealso::
- For more information on how to use this sensor, take a look at the
guide:
- :ref:`howto/operator:TimeDeltaSensorAsync`
-
+ Asynchronous execution
"""
- def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs) ->
None:
- super().__init__(delta=delta, **kwargs)
- self.end_from_trigger = end_from_trigger
-
def execute(self, context: Context) -> bool | NoReturn:
+ """
+ Depending on the deferrable flag, either execute the sensor in a
blocking way or defer it.
+
+ - Sync path → use BaseSensorOperator.execute() which loops over
``poke``.
+ - Async path → defer to DateTimeTrigger and free the worker slot.
+ """
+ if not self.deferrable:
+ return super().execute(context=context)
+
+ # Deferrable path
base_time = self._derive_base_time(context=context)
target_dttm: datetime = base_time + self.delta
@@ -146,6 +152,26 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
return None
+# TODO: Remove in the next major release
+@deprecated(
+ "Use `TimeDeltaSensor` with `deferrable=True` instead",
category=AirflowProviderDeprecationWarning
+)
+class TimeDeltaSensorAsync(TimeDeltaSensor):
+ """
+ Deprecated. Use TimeDeltaSensor with deferrable=True instead.
+
+ :sphinx-autoapi-skip:
+ """
+
+ def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs) ->
None:
+ warnings.warn(
+ "TimeDeltaSensorAsync is deprecated and will be removed in a
future version. Use `TimeDeltaSensor` with `deferrable=True` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ super().__init__(delta=delta, deferrable=True,
end_from_trigger=end_from_trigger, **kwargs)
+
+
class WaitSensor(BaseSensorOperator):
"""
A sensor that waits a specified period of time before completing.
diff --git a/providers/standard/tests/system/standard/example_sensors.py
b/providers/standard/tests/system/standard/example_sensors.py
index bc151f140c6..79d8e6d6288 100644
--- a/providers/standard/tests/system/standard/example_sensors.py
+++ b/providers/standard/tests/system/standard/example_sensors.py
@@ -26,7 +26,7 @@ from airflow.providers.standard.sensors.bash import BashSensor
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.providers.standard.sensors.time import TimeSensor
-from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor,
TimeDeltaSensorAsync
+from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
from airflow.providers.standard.utils.weekday import WeekDay
from airflow.sdk import DAG
@@ -57,7 +57,9 @@ with DAG(
# [END example_time_delta_sensor]
# [START example_time_delta_sensor_async]
- t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async",
delta=datetime.timedelta(seconds=2))
+ t0a = TimeDeltaSensor(
+ task_id="wait_some_seconds_async",
delta=datetime.timedelta(seconds=2), deferrable=True
+ )
# [END example_time_delta_sensor_async]
# [START example_time_sensors]
diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py
b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
index 12838f4364d..a42c5ad72cd 100644
--- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py
+++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
@@ -18,13 +18,14 @@
from __future__ import annotations
from datetime import timedelta
+from typing import Any
from unittest import mock
import pendulum
import pytest
import time_machine
-from airflow.exceptions import TaskDeferred
+from airflow.exceptions import AirflowProviderDeprecationWarning, TaskDeferred
from airflow.models import DagBag
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.time_delta import (
@@ -32,6 +33,7 @@ from airflow.providers.standard.sensors.time_delta import (
TimeDeltaSensorAsync,
WaitSensor,
)
+from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils import timezone
from airflow.utils.timezone import datetime
@@ -105,6 +107,57 @@ def test_timedelta_sensor_run_after_vs_interval(run_after,
interval_end, dag_mak
assert actual == expected
[email protected](
+ "run_after, interval_end",
+ [
+ (timezone.utcnow() + timedelta(days=1), timezone.utcnow() +
timedelta(days=2)),
+ (timezone.utcnow() + timedelta(days=1), None),
+ ],
+)
+def test_timedelta_sensor_deferrable_run_after_vs_interval(run_after,
interval_end, dag_maker):
+ """Test that TimeDeltaSensor defers correctly when flag is enabled."""
+ if not AIRFLOW_V_3_0_PLUS and not interval_end:
+ pytest.skip("not applicable")
+
+ context: dict[str, Any] = {}
+ if interval_end:
+ context["data_interval_end"] = interval_end
+
+ with dag_maker() as dag:
+ kwargs = {}
+ if AIRFLOW_V_3_0_PLUS:
+ from airflow.utils.types import DagRunTriggeredByType
+
+ kwargs.update(triggered_by=DagRunTriggeredByType.TEST,
run_after=run_after)
+
+ delta = timedelta(minutes=5)
+ sensor = TimeDeltaSensor(
+ task_id="timedelta_sensor_deferrable",
+ delta=delta,
+ dag=dag,
+ deferrable=True, # <-- the feature under test
+ )
+
+ dr = dag.create_dagrun(
+ run_id="abcrhroceuh",
+ run_type=DagRunType.MANUAL,
+ state=None,
+ **kwargs,
+ )
+ context.update(dag_run=dr)
+
+ expected_base = interval_end or run_after
+ expected_fire_time = expected_base + delta
+
+ with pytest.raises(TaskDeferred) as td:
+ sensor.execute(context)
+
+ # The sensor should defer once with a DateTimeTrigger
+ trigger = td.value.trigger
+ assert isinstance(trigger, DateTimeTrigger)
+ assert trigger.moment == expected_fire_time
+
+
class TestTimeDeltaSensorAsync:
def setup_method(self):
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
@@ -117,17 +170,20 @@ class TestTimeDeltaSensorAsync:
)
@mock.patch(DEFER_PATH)
def test_timedelta_sensor(self, defer_mock, should_defer):
- delta = timedelta(hours=1)
- op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check",
delta=delta, dag=self.dag)
- if should_defer:
- data_interval_end = pendulum.now("UTC").add(hours=1)
- else:
- data_interval_end = pendulum.now("UTC").replace(microsecond=0,
second=0, minute=0).add(hours=-1)
- op.execute({"data_interval_end": data_interval_end})
- if should_defer:
- defer_mock.assert_called_once()
- else:
- defer_mock.assert_not_called()
+ with pytest.warns(AirflowProviderDeprecationWarning):
+ delta = timedelta(hours=1)
+ op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check",
delta=delta, dag=self.dag)
+ if should_defer:
+ data_interval_end = pendulum.now("UTC").add(hours=1)
+ else:
+ data_interval_end = (
+ pendulum.now("UTC").replace(microsecond=0, second=0,
minute=0).add(hours=-1)
+ )
+ op.execute({"data_interval_end": data_interval_end})
+ if should_defer:
+ defer_mock.assert_called_once()
+ else:
+ defer_mock.assert_not_called()
@pytest.mark.parametrize(
"should_defer",
@@ -157,31 +213,32 @@ class TestTimeDeltaSensorAsync:
)
def test_timedelta_sensor_async_run_after_vs_interval(self, run_after,
interval_end, dag_maker):
"""Interval end should be used as base time when present else
run_after"""
- if not AIRFLOW_V_3_0_PLUS and not interval_end:
- pytest.skip("not applicable")
-
- context = {}
- if interval_end:
- context["data_interval_end"] = interval_end
- with dag_maker() as dag:
- kwargs = {}
- if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
- kwargs.update(triggered_by=DagRunTriggeredByType.TEST,
run_after=run_after)
-
- dr = dag.create_dagrun(
- run_id="abcrhroceuh",
- run_type=DagRunType.MANUAL,
- state=None,
- **kwargs,
- )
- context.update(dag_run=dr)
- delta = timedelta(seconds=1)
- op = TimeDeltaSensorAsync(task_id="wait_sensor_check",
delta=delta, dag=dag)
- base_time = interval_end or run_after
- expected_time = base_time + delta
- with pytest.raises(TaskDeferred) as caught:
- op.execute(context)
-
- assert caught.value.trigger.moment == expected_time
+ with pytest.warns(AirflowProviderDeprecationWarning):
+ if not AIRFLOW_V_3_0_PLUS and not interval_end:
+ pytest.skip("not applicable")
+
+ context = {}
+ if interval_end:
+ context["data_interval_end"] = interval_end
+ with dag_maker() as dag:
+ kwargs = {}
+ if AIRFLOW_V_3_0_PLUS:
+ from airflow.utils.types import DagRunTriggeredByType
+
+ kwargs.update(triggered_by=DagRunTriggeredByType.TEST,
run_after=run_after)
+
+ dr = dag.create_dagrun(
+ run_id="abcrhroceuh",
+ run_type=DagRunType.MANUAL,
+ state=None,
+ **kwargs,
+ )
+ context.update(dag_run=dr)
+ delta = timedelta(seconds=1)
+ op = TimeDeltaSensorAsync(task_id="wait_sensor_check",
delta=delta, dag=dag)
+ base_time = interval_end or run_after
+ expected_time = base_time + delta
+ with pytest.raises(TaskDeferred) as caught:
+ op.execute(context)
+
+ assert caught.value.trigger.moment == expected_time