This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 090a7f0707 DateTimeTrigger typing and tests (#37694)
090a7f0707 is described below

commit 090a7f0707077320225e2cb4b1bc091c8fefee34
Author: Dylan Rajguru <445696+drajg...@users.noreply.github.com>
AuthorDate: Tue Feb 27 12:37:25 2024 -0800

    DateTimeTrigger typing and tests (#37694)
---
 airflow/triggers/temporal.py    | 14 ++++---
 scripts/cov/other_coverage.py   | 84 +++++++++++++++++++++++++++++------------
 tests/triggers/test_temporal.py | 44 +++++++++++++++++++++
 3 files changed, 112 insertions(+), 30 deletions(-)

diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py
index 18bdd80bff..03f0901db7 100644
--- a/airflow/triggers/temporal.py
+++ b/airflow/triggers/temporal.py
@@ -18,7 +18,9 @@ from __future__ import annotations
 
 import asyncio
 import datetime
-from typing import Any
+from typing import Any, AsyncIterator
+
+import pendulum
 
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.utils import timezone
@@ -42,12 +44,12 @@ class DateTimeTrigger(BaseTrigger):
         elif moment.tzinfo is None:
             raise ValueError("You cannot pass naive datetimes")
         else:
-            self.moment = timezone.convert_to_utc(moment)
+            self.moment: pendulum.DateTime = timezone.convert_to_utc(moment)
 
     def serialize(self) -> tuple[str, dict[str, Any]]:
         return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": 
self.moment})
 
-    async def run(self):
+    async def run(self) -> AsyncIterator[TriggerEvent]:
         """
         Loop until the relevant time is met.
 
@@ -59,13 +61,13 @@ class DateTimeTrigger(BaseTrigger):
         # Sleep in successively smaller increments starting from 1 hour down 
to 10 seconds at a time
         self.log.info("trigger starting")
         for step in 3600, 60, 10:
-            seconds_remaining = (self.moment - 
timezone.utcnow()).total_seconds()
+            seconds_remaining = (self.moment - 
pendulum.instance(timezone.utcnow())).total_seconds()
             while seconds_remaining > 2 * step:
                 self.log.info(f"{int(seconds_remaining)} seconds remaining; 
sleeping {step} seconds")
                 await asyncio.sleep(step)
-                seconds_remaining = (self.moment - 
timezone.utcnow()).total_seconds()
+                seconds_remaining = (self.moment - 
pendulum.instance(timezone.utcnow())).total_seconds()
         # Sleep a second at a time otherwise
-        while self.moment > timezone.utcnow():
+        while self.moment > pendulum.instance(timezone.utcnow()):
             self.log.info("sleeping 1 second...")
             await asyncio.sleep(1)
         # Send our single event and then we're done
diff --git a/scripts/cov/other_coverage.py b/scripts/cov/other_coverage.py
index ff30725ab0..6543d2fc78 100644
--- a/scripts/cov/other_coverage.py
+++ b/scripts/cov/other_coverage.py
@@ -25,45 +25,81 @@ sys.path.insert(0, str(Path(__file__).parent.resolve()))
 
 source_files = [
     "airflow/dag_processing",
+    "airflow/triggers",
 ]
+"""
+Other potential source file packages to scan for coverage.
+You can also compare the stats against those on
+https://app.codecov.io/github/apache/airflow
+(as it combines the coverage from all tests and so may be a bit higher).
+
+    "airflow/auth",
+    "airflow/callbacks",
+    "airflow/config_templates",
+    "airflow/dag_processing",
+    "airflow/datasets",
+    "airflow/decorators",
+    "airflow/hooks",
+    "airflow/io",
+    "airflow/lineage",
+    "airflow/listeners",
+    "airflow/macros",
+    "airflow/notifications",
+    "airflow/secrets",
+    "airflow/security",
+    "airflow/sensors",
+    "airflow/task",
+    "airflow/template",
+    "airflow/timetables",
+    "airflow/triggers",
+"""
 
 files_not_fully_covered = [
     "airflow/dag_processing/manager.py",
     "airflow/dag_processing/processor.py",
+    "airflow/triggers/base.py",
+    "airflow/triggers/external_task.py",
+    "airflow/triggers/file.py",
+    "airflow/triggers/testing.py",
 ]
 
 other_tests = [
     "tests/dag_processing",
+    "tests/jobs",
+    "tests/triggers",
 ]
 
 """
-These 'other' packages can be added to the above lists
-as necessary:
+Other tests to potentially run against the source_file packages:
 
-"tests/auth",
-"tests/callbacks",
-"tests/charts",
-"tests/cluster_policies",
-"tests/config_templates",
-"tests/datasets",
-"tests/decorators",
-"tests/hooks",
-"tests/io",
-"tests/lineage",
-"tests/listeners",
-"tests/macros",
-"tests/notifications",
-"tests/plugins",
-"tests/secrets",
-"tests/security",
-"tests/sensors",
-"tests/task",
-"tests/template",
-"tests/testconfig",
-"tests/timetables",
-"tests/triggers",
+    "tests/api_internal",
+    "tests/auth",
+    "tests/callbacks",
+    "tests/charts",
+    "tests/cluster_policies",
+    "tests/config_templates",
+    "tests/dag_processing",
+    "tests/datasets",
+    "tests/decorators",
+    "tests/hooks",
+    "tests/io",
+    "tests/jobs",
+    "tests/lineage",
+    "tests/listeners",
+    "tests/macros",
+    "tests/notifications",
+    "tests/plugins",
+    "tests/secrets",
+    "tests/security",
+    "tests/sensors",
+    "tests/task",
+    "tests/template",
+    "tests/testconfig",
+    "tests/timetables",
+    "tests/triggers",
 """
 
+
 if __name__ == "__main__":
     args = ["-qq"] + other_tests
     run_tests(args, source_files, files_not_fully_covered)
diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py
index 52cc2c64f6..6e8d32c467 100644
--- a/tests/triggers/test_temporal.py
+++ b/tests/triggers/test_temporal.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import asyncio
 import datetime
+from unittest import mock
 
 import pendulum
 import pytest
@@ -25,6 +26,7 @@ import pytest
 from airflow.triggers.base import TriggerEvent
 from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
 from airflow.utils import timezone
+from airflow.utils.timezone import utcnow
 
 
 def test_input_validation():
@@ -35,6 +37,16 @@ def test_input_validation():
         DateTimeTrigger("2012-01-01T03:03:03+00:00")
 
 
+def test_input_validation_tz():
+    """
+    Tests that the DateTimeTrigger validates input to moment arg, it shouldn't 
accept naive datetime.
+    """
+
+    moment = datetime.datetime(2013, 3, 31, 0, 59, 59)
+    with pytest.raises(ValueError, match="You cannot pass naive datetimes"):
+        DateTimeTrigger(moment)
+
+
 def test_datetime_trigger_serialization():
     """
     Tests that the DateTimeTrigger correctly serializes its arguments
@@ -96,3 +108,35 @@ async def test_datetime_trigger_timing(tz):
     result = trigger_task.result()
     assert isinstance(result, TriggerEvent)
     assert result.payload == past_moment
+
+
+@mock.patch("airflow.triggers.temporal.timezone.utcnow")
+@mock.patch("airflow.triggers.temporal.asyncio.sleep")
+@pytest.mark.asyncio
+async def test_datetime_trigger_mocked(mock_sleep, mock_utcnow):
+    """
+    Tests DateTimeTrigger with time and asyncio mocks
+    """
+    start_moment = utcnow()
+    trigger_moment = start_moment + datetime.timedelta(seconds=30)
+
+    # returns the mock 'current time'. The first 3 calls report the initial 
time
+    mock_utcnow.side_effect = [
+        start_moment,
+        start_moment,
+        start_moment,
+        start_moment + datetime.timedelta(seconds=20),
+        start_moment + datetime.timedelta(seconds=25),
+        start_moment + datetime.timedelta(seconds=30),
+    ]
+
+    trigger = DateTimeTrigger(trigger_moment)
+    gen = trigger.run()
+    trigger_task = asyncio.create_task(gen.__anext__())
+    await trigger_task
+    mock_sleep.assert_awaited()
+    assert mock_sleep.await_count == 2
+    assert trigger_task.done() is True
+    result = trigger_task.result()
+    assert isinstance(result, TriggerEvent)
+    assert result.payload == trigger_moment

Reply via email to