This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 090a7f0707 DateTimeTrigger typing and tests (#37694) 090a7f0707 is described below commit 090a7f0707077320225e2cb4b1bc091c8fefee34 Author: Dylan Rajguru <445696+drajg...@users.noreply.github.com> AuthorDate: Tue Feb 27 12:37:25 2024 -0800 DateTimeTrigger typing and tests (#37694) --- airflow/triggers/temporal.py | 14 ++++--- scripts/cov/other_coverage.py | 84 +++++++++++++++++++++++++++++------------ tests/triggers/test_temporal.py | 44 +++++++++++++++++++++ 3 files changed, 112 insertions(+), 30 deletions(-) diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py index 18bdd80bff..03f0901db7 100644 --- a/airflow/triggers/temporal.py +++ b/airflow/triggers/temporal.py @@ -18,7 +18,9 @@ from __future__ import annotations import asyncio import datetime -from typing import Any +from typing import Any, AsyncIterator + +import pendulum from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.utils import timezone @@ -42,12 +44,12 @@ class DateTimeTrigger(BaseTrigger): elif moment.tzinfo is None: raise ValueError("You cannot pass naive datetimes") else: - self.moment = timezone.convert_to_utc(moment) + self.moment: pendulum.DateTime = timezone.convert_to_utc(moment) def serialize(self) -> tuple[str, dict[str, Any]]: return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment}) - async def run(self): + async def run(self) -> AsyncIterator[TriggerEvent]: """ Loop until the relevant time is met. @@ -59,13 +61,13 @@ class DateTimeTrigger(BaseTrigger): # Sleep in successively smaller increments starting from 1 hour down to 10 seconds at a time self.log.info("trigger starting") for step in 3600, 60, 10: - seconds_remaining = (self.moment - timezone.utcnow()).total_seconds() + seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds() while seconds_remaining > 2 * step: self.log.info(f"{int(seconds_remaining)} seconds remaining; sleeping {step} seconds") await asyncio.sleep(step) - seconds_remaining = (self.moment - timezone.utcnow()).total_seconds() + seconds_remaining = (self.moment - pendulum.instance(timezone.utcnow())).total_seconds() # Sleep a second at a time otherwise - while self.moment > timezone.utcnow(): + while self.moment > pendulum.instance(timezone.utcnow()): self.log.info("sleeping 1 second...") await asyncio.sleep(1) # Send our single event and then we're done diff --git a/scripts/cov/other_coverage.py b/scripts/cov/other_coverage.py index ff30725ab0..6543d2fc78 100644 --- a/scripts/cov/other_coverage.py +++ b/scripts/cov/other_coverage.py @@ -25,45 +25,81 @@ sys.path.insert(0, str(Path(__file__).parent.resolve())) source_files = [ "airflow/dag_processing", + "airflow/triggers", ] +""" +Other potential source file packages to scan for coverage. +You can also compare the stats against those on +https://app.codecov.io/github/apache/airflow +(as it combines the coverage from all tests and so may be a bit higher). + + "airflow/auth", + "airflow/callbacks", + "airflow/config_templates", + "airflow/dag_processing", + "airflow/datasets", + "airflow/decorators", + "airflow/hooks", + "airflow/io", + "airflow/lineage", + "airflow/listeners", + "airflow/macros", + "airflow/notifications", + "airflow/secrets", + "airflow/security", + "airflow/sensors", + "airflow/task", + "airflow/template", + "airflow/timetables", + "airflow/triggers", +""" files_not_fully_covered = [ "airflow/dag_processing/manager.py", "airflow/dag_processing/processor.py", + "airflow/triggers/base.py", + "airflow/triggers/external_task.py", + "airflow/triggers/file.py", + "airflow/triggers/testing.py", ] other_tests = [ "tests/dag_processing", + "tests/jobs", + "tests/triggers", ] """ -These 'other' packages can be added to the above lists -as necessary: +Other tests to potentially run against the source_file packages: -"tests/auth", -"tests/callbacks", -"tests/charts", -"tests/cluster_policies", -"tests/config_templates", -"tests/datasets", -"tests/decorators", -"tests/hooks", -"tests/io", -"tests/lineage", -"tests/listeners", -"tests/macros", -"tests/notifications", -"tests/plugins", -"tests/secrets", -"tests/security", -"tests/sensors", -"tests/task", -"tests/template", -"tests/testconfig", -"tests/timetables", -"tests/triggers", + "tests/api_internal", + "tests/auth", + "tests/callbacks", + "tests/charts", + "tests/cluster_policies", + "tests/config_templates", + "tests/dag_processing", + "tests/datasets", + "tests/decorators", + "tests/hooks", + "tests/io", + "tests/jobs", + "tests/lineage", + "tests/listeners", + "tests/macros", + "tests/notifications", + "tests/plugins", + "tests/secrets", + "tests/security", + "tests/sensors", + "tests/task", + "tests/template", + "tests/testconfig", + "tests/timetables", + "tests/triggers", """ + if __name__ == "__main__": args = ["-qq"] + other_tests run_tests(args, source_files, files_not_fully_covered) diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 52cc2c64f6..6e8d32c467 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -18,6 +18,7 @@ from __future__ import annotations import asyncio import datetime +from unittest import mock import pendulum import pytest @@ -25,6 +26,7 @@ import pytest from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone +from airflow.utils.timezone import utcnow def test_input_validation(): @@ -35,6 +37,16 @@ def test_input_validation(): DateTimeTrigger("2012-01-01T03:03:03+00:00") +def test_input_validation_tz(): + """ + Tests that the DateTimeTrigger validates input to moment arg, it shouldn't accept naive datetime. + """ + + moment = datetime.datetime(2013, 3, 31, 0, 59, 59) + with pytest.raises(ValueError, match="You cannot pass naive datetimes"): + DateTimeTrigger(moment) + + def test_datetime_trigger_serialization(): """ Tests that the DateTimeTrigger correctly serializes its arguments @@ -96,3 +108,35 @@ async def test_datetime_trigger_timing(tz): result = trigger_task.result() assert isinstance(result, TriggerEvent) assert result.payload == past_moment + + +@mock.patch("airflow.triggers.temporal.timezone.utcnow") +@mock.patch("airflow.triggers.temporal.asyncio.sleep") +@pytest.mark.asyncio +async def test_datetime_trigger_mocked(mock_sleep, mock_utcnow): + """ + Tests DateTimeTrigger with time and asyncio mocks + """ + start_moment = utcnow() + trigger_moment = start_moment + datetime.timedelta(seconds=30) + + # returns the mock 'current time'. The first 3 calls report the initial time + mock_utcnow.side_effect = [ + start_moment, + start_moment, + start_moment, + start_moment + datetime.timedelta(seconds=20), + start_moment + datetime.timedelta(seconds=25), + start_moment + datetime.timedelta(seconds=30), + ] + + trigger = DateTimeTrigger(trigger_moment) + gen = trigger.run() + trigger_task = asyncio.create_task(gen.__anext__()) + await trigger_task + mock_sleep.assert_awaited() + assert mock_sleep.await_count == 2 + assert trigger_task.done() is True + result = trigger_task.result() + assert isinstance(result, TriggerEvent) + assert result.payload == trigger_moment