This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 243b704 Add DateTimeSensor (#9697)
243b704 is described below
commit 243b704f47df00365e3421b55d8818fc9c71196f
Author: zikun <[email protected]>
AuthorDate: Fri Jul 24 00:53:10 2020 +0800
Add DateTimeSensor (#9697)
* Add DateTimeSensor
---
airflow/sensors/date_time_sensor.py | 77 ++++++++++++++++++++++++++++++++++
docs/operators-and-hooks-ref.rst | 3 ++
tests/sensors/test_date_time_sensor.py | 72 +++++++++++++++++++++++++++++++
3 files changed, 152 insertions(+)
diff --git a/airflow/sensors/date_time_sensor.py
b/airflow/sensors/date_time_sensor.py
new file mode 100644
index 0000000..4af3bd2
--- /dev/null
+++ b/airflow/sensors/date_time_sensor.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+from typing import Dict, Union
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class DateTimeSensor(BaseSensorOperator):
+ """
+ Waits until the specified datetime.
+
+ A major advantage of this sensor is idempotence for the ``target_time``.
+ It handles some cases for which ``TimeSensor`` and ``TimeDeltaSensor`` are
not suited.
+
+ **Example** 1 :
+ If a task needs to wait for 11am on each ``execution_date``. Using
+ ``TimeSensor`` or ``TimeDeltaSensor``, all backfill tasks started at
+ 1am have to wait for 10 hours. This is unnecessary, e.g. a backfill
+ task with ``{{ ds }} = '1970-01-01'`` does not need to wait because
+ ``1970-01-01T11:00:00`` has already passed.
+
+ **Example** 2 :
+ If a DAG is scheduled to run at 23:00 daily, but one of the tasks is
+ required to run at 01:00 next day, using ``TimeSensor`` will return
+ ``True`` immediately because 23:00 > 01:00. Instead, we can do this:
+
+ .. code-block:: python
+
+ DateTimeSensor(
+ task_id='wait_for_0100',
+ target_time='{{ next_execution_date.tomorrow().replace(hour=1)
}}',
+ )
+
+ :param target_time: datetime after which the job succeeds. (templated)
+ :type target_time: str or datetime.datetime
+ """
+
+ template_fields = ("target_time",)
+
+ @apply_defaults
+ def __init__(
+ self, target_time: Union[str, datetime.datetime], *args, **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ if isinstance(target_time, datetime.datetime):
+ self.target_time = target_time.isoformat()
+ elif isinstance(target_time, str):
+ self.target_time = target_time
+ else:
+ raise TypeError(
+ "Expected str or datetime.datetime type for target_time. Got
{}".format(
+ type(target_time)
+ )
+ )
+
+ def poke(self, context: Dict) -> bool:
+ self.log.info("Checking if the time (%s) has come", self.target_time)
+ return timezone.utcnow() > timezone.parse(self.target_time)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 6f5028f..17ed2be 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -98,6 +98,9 @@ Fundamentals
* - :mod:`airflow.sensors.time_sensor`
-
+ * - :mod:`airflow.sensors.date_time_sensor`
+ -
+
.. _Apache:
diff --git a/tests/sensors/test_date_time_sensor.py
b/tests/sensors/test_date_time_sensor.py
new file mode 100644
index 0000000..b8b81d1
--- /dev/null
+++ b/tests/sensors/test_date_time_sensor.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+from mock import patch
+from parameterized import parameterized
+
+from airflow.models.dag import DAG
+from airflow.sensors.date_time_sensor import DateTimeSensor
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+
+
+class TestDateTimeSensor:
+ @classmethod
+ def setup_class(cls):
+ args = {"owner": "airflow", "start_date": DEFAULT_DATE}
+ cls.dag = DAG("test_dag", default_args=args)
+
+ @parameterized.expand(
+ [
+ (
+ "valid_datetime",
+ timezone.datetime(2020, 7, 6, 13, tzinfo=timezone.utc),
+ "2020-07-06T13:00:00+00:00",
+ ),
+ ("valid_str", "20200706T210000+8", "20200706T210000+8"),
+ ]
+ )
+ def test_valid_input(self, task_id, target_time, expected):
+ op = DateTimeSensor(task_id=task_id, target_time=target_time,
dag=self.dag,)
+ assert op.target_time == expected
+
+ def test_invalid_input(self):
+ with pytest.raises(TypeError):
+ DateTimeSensor(
+ task_id="test", target_time=timezone.utcnow().time(),
dag=self.dag,
+ )
+
+ @parameterized.expand(
+ [
+ (
+ "poke_datetime",
+ timezone.datetime(2020, 1, 1, 22, 59, tzinfo=timezone.utc),
+ True,
+ ),
+ ("poke_str_extended", "2020-01-01T23:00:00.001+00:00", False),
+ ("poke_str_basic_with_tz", "20200102T065959+8", True),
+ ]
+ )
+ @patch(
+ "airflow.sensors.date_time_sensor.timezone.utcnow",
+ return_value=timezone.datetime(2020, 1, 1, 23, 0, tzinfo=timezone.utc),
+ )
+ def test_poke(self, task_id, target_time, expected, mock_utcnow):
+ op = DateTimeSensor(task_id=task_id, target_time=target_time,
dag=self.dag)
+ assert op.poke(None) == expected