This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 43cd627100d AIP-86 - Move DeadlineAlert into the SDK since it is
direct user-facing code (#51326)
43cd627100d is described below
commit 43cd627100da2a8dbad0bf55af308a299976d1c8
Author: D. Ferruzzi <[email protected]>
AuthorDate: Fri Jun 6 13:18:42 2025 -0700
AIP-86 - Move DeadlineAlert into the SDK since it is direct user-facing
code (#51326)
* AIP-86 - Move DeadlineAlert into the SDK since it is direct user-facing
code
* Remove caplog from Deadline unit tests per this email thread:
https://lists.apache.org/thread/f7t5zl6t3t0s89rt37orfcv4966crojt
---------
Co-authored-by: Niko Oliveira <[email protected]>
---
airflow-core/src/airflow/models/deadline.py | 64 +-----------------
.../airflow/serialization/serialized_objects.py | 2 +-
airflow-core/tests/unit/models/test_deadline.py | 62 ++----------------
task-sdk/src/airflow/sdk/definitions/dag.py | 2 +-
task-sdk/src/airflow/sdk/definitions/deadline.py | 76 ++++++++++++++++++++--
.../tests/task_sdk/definitions/test_deadline.py | 47 ++++++++++++-
6 files changed, 127 insertions(+), 126 deletions(-)
diff --git a/airflow-core/src/airflow/models/deadline.py
b/airflow-core/src/airflow/models/deadline.py
index a769ea0b1d3..dfa4bb104c0 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -19,8 +19,8 @@ from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
-from datetime import datetime, timedelta
-from typing import TYPE_CHECKING, Any, Callable
+from datetime import datetime
+from typing import TYPE_CHECKING, Any
import sqlalchemy_jsonfield
import uuid6
@@ -32,14 +32,12 @@ from sqlalchemy_utils import UUIDType
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.module_loading import import_string, is_valid_dotpath
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
if TYPE_CHECKING:
from sqlalchemy.orm import Session
- from airflow.sdk.definitions.deadline import DeadlineReference
logger = logging.getLogger(__name__)
@@ -168,63 +166,7 @@ class ReferenceModels:
return _fetch_from_db(DagRun.queued_at, **kwargs)
-class DeadlineAlert:
- """Store Deadline values needed to calculate the need-by timestamp and the
callback information."""
-
- def __init__(
- self,
- reference: DeadlineReference,
- interval: timedelta,
- callback: Callable | str,
- callback_kwargs: dict | None = None,
- ):
- self.reference = reference
- self.interval = interval
- self.callback_kwargs = callback_kwargs
- self.callback = self.get_callback_path(callback)
-
- @staticmethod
- def get_callback_path(_callback: str | Callable) -> str:
- if callable(_callback):
- # Get the reference path to the callable in the form
`airflow.models.deadline.get_from_db`
- return f"{_callback.__module__}.{_callback.__qualname__}"
-
- if not isinstance(_callback, str) or not
is_valid_dotpath(_callback.strip()):
- raise ImportError(f"`{_callback}` doesn't look like a valid dot
path.")
-
- stripped_callback = _callback.strip()
-
- try:
- # The provided callback is a string which appears to be a valid
dotpath, attempt to import it.
- callback = import_string(stripped_callback)
- except ImportError as e:
- # Logging here instead of failing because it is possible that the
code for the callable
- # exists somewhere other than on the DAG processor. We are making
a best effort to validate,
- # but can't rule out that it may be available at runtime even if
it can not be imported here.
- logger.debug(
- "Callback %s is formatted like a callable dotpath, but could
not be imported.\n%s",
- stripped_callback,
- e,
- )
- return stripped_callback
-
- # If we get this far then the input is a string which can be imported,
check if it is a callable.
- if not callable(callback):
- raise AttributeError(f"Provided callback {callback} is not
callable.")
-
- return stripped_callback
-
- def serialize_deadline_alert(self):
- from airflow.serialization.serialized_objects import BaseSerialization
-
- return BaseSerialization.serialize(
- {
- "reference": self.reference,
- "interval": self.interval,
- "callback": self.callback,
- "callback_kwargs": self.callback_kwargs,
- }
- )
+DeadlineReferenceType = ReferenceModels.BaseDeadlineReference
@provide_session
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index ed058ebb090..72962eeb444 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -44,7 +44,6 @@ from airflow.exceptions import AirflowException,
SerializationError, TaskDeferre
from airflow.models.baseoperator import BaseOperator
from airflow.models.connection import Connection
from airflow.models.dag import DAG, _get_model_data_interval
-from airflow.models.deadline import DeadlineAlert
from airflow.models.expandinput import (
create_expand_input,
)
@@ -66,6 +65,7 @@ from airflow.sdk.definitions.asset import (
AssetWatcher,
BaseAsset,
)
+from airflow.sdk.definitions.deadline import DeadlineAlert
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
diff --git a/airflow-core/tests/unit/models/test_deadline.py
b/airflow-core/tests/unit/models/test_deadline.py
index a5bf6bd8ea8..acc97d78eba 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -17,7 +17,6 @@
from __future__ import annotations
import json
-import logging
from datetime import datetime
from unittest import mock
@@ -27,7 +26,7 @@ from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from airflow.models import DagRun
-from airflow.models.deadline import Deadline, DeadlineAlert, _fetch_from_db
+from airflow.models.deadline import Deadline, _fetch_from_db
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.deadline import DeadlineReference
from airflow.utils.state import DagRunState
@@ -40,12 +39,6 @@ RUN_ID = 1
TEST_CALLBACK_KWARGS = {"to": "[email protected]"}
TEST_CALLBACK_PATH = f"{__name__}.test_callback"
-UNIMPORTABLE_DOT_PATH = "valid.but.nonexistent.path"
-
-
-def test_callback():
- """An empty Callable to use for the callback tests in this suite."""
- pass
def _clean_db():
@@ -144,50 +137,6 @@ class TestDeadline:
)
-class TestDeadlineAlert:
- @pytest.mark.parametrize(
- "callback_value, expected_path",
- [
- pytest.param(test_callback, TEST_CALLBACK_PATH,
id="valid_callable"),
- pytest.param(TEST_CALLBACK_PATH, TEST_CALLBACK_PATH,
id="valid_path_string"),
- pytest.param(lambda x: x, None, id="lambda_function"),
- pytest.param(TEST_CALLBACK_PATH + " ", TEST_CALLBACK_PATH,
id="path_with_whitespace"),
- pytest.param(UNIMPORTABLE_DOT_PATH, UNIMPORTABLE_DOT_PATH,
id="valid_format_not_importable"),
- ],
- )
- def test_get_callback_path_happy_cases(self, callback_value,
expected_path):
- path = DeadlineAlert.get_callback_path(callback_value)
- if expected_path is None:
- assert path.endswith("<lambda>")
- else:
- assert path == expected_path
-
- @pytest.mark.parametrize(
- "callback_value, error_type",
- [
- pytest.param(42, ImportError, id="not_a_string"),
- pytest.param("", ImportError, id="empty_string"),
- pytest.param("os.path", AttributeError, id="non_callable_module"),
- ],
- )
- def test_get_callback_path_error_cases(self, callback_value, error_type):
- expected_message = ""
- if error_type is ImportError:
- expected_message = "doesn't look like a valid dot path."
- elif error_type is AttributeError:
- expected_message = "is not callable."
-
- with pytest.raises(error_type, match=expected_message):
- DeadlineAlert.get_callback_path(callback_value)
-
- def test_log_unimportable_but_properly_formatted_callback(self, caplog):
- with caplog.at_level(logging.DEBUG):
- path = DeadlineAlert.get_callback_path(UNIMPORTABLE_DOT_PATH)
-
- assert "could not be imported" in caplog.text
- assert path == UNIMPORTABLE_DOT_PATH
-
-
@pytest.mark.db_test
class TestCalculatedDeadlineDatabaseCalls:
@staticmethod
@@ -265,7 +214,7 @@ class TestCalculatedDeadlineDatabaseCalls:
)
@mock.patch("sqlalchemy.orm.Session")
def test_fetch_from_db_error_cases(
- self, mock_session, use_valid_conditions, scalar_side_effect,
expected_error, expected_message, caplog
+ self, mock_session, use_valid_conditions, scalar_side_effect,
expected_error, expected_message
):
"""Test database access error handling."""
model_reference = DagRun.logical_date
@@ -274,11 +223,8 @@ class TestCalculatedDeadlineDatabaseCalls:
# Configure mock session
mock_session.scalar.side_effect = scalar_side_effect
- with caplog.at_level(logging.ERROR):
- with pytest.raises(expected_error, match=expected_message):
- _fetch_from_db(model_reference, session=mock_session,
**conditions)
- if expected_message:
- assert expected_message in caplog.text
+ with pytest.raises(expected_error, match=expected_message):
+ _fetch_from_db(model_reference, session=mock_session, **conditions)
@pytest.mark.parametrize(
"reference, expected_column",
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 87215f5531a..8b25e6a8b39 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -50,13 +50,13 @@ from airflow.exceptions import (
ParamValidationError,
TaskNotFound,
)
-from airflow.models.deadline import DeadlineAlert
from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator
from airflow.sdk.definitions._internal.node import validate_key
from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet
from airflow.sdk.definitions.asset import AssetAll, BaseAsset
from airflow.sdk.definitions.context import Context
+from airflow.sdk.definitions.deadline import DeadlineAlert
from airflow.sdk.definitions.param import DagParam, ParamsDict
from airflow.timetables.base import Timetable
from airflow.timetables.simple import (
diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py
b/task-sdk/src/airflow/sdk/definitions/deadline.py
index aa3d643a975..d1206940635 100644
--- a/task-sdk/src/airflow/sdk/definitions/deadline.py
+++ b/task-sdk/src/airflow/sdk/definitions/deadline.py
@@ -16,7 +16,75 @@
# under the License.
from __future__ import annotations
-from datetime import datetime
+import logging
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Callable
+
+from airflow.utils.module_loading import import_string, is_valid_dotpath
+
+if TYPE_CHECKING:
+ from airflow.models.deadline import DeadlineReferenceType
+
+logger = logging.getLogger(__name__)
+
+
+class DeadlineAlert:
+ """Store Deadline values needed to calculate the need-by timestamp and the
callback information."""
+
+ def __init__(
+ self,
+ reference: DeadlineReferenceType,
+ interval: timedelta,
+ callback: Callable | str,
+ callback_kwargs: dict | None = None,
+ ):
+ self.reference = reference
+ self.interval = interval
+ self.callback_kwargs = callback_kwargs
+ self.callback = self.get_callback_path(callback)
+
+ @staticmethod
+ def get_callback_path(_callback: str | Callable) -> str:
+ if callable(_callback):
+ # Get the reference path to the callable in the form
`airflow.models.deadline.get_from_db`
+ return f"{_callback.__module__}.{_callback.__qualname__}"
+
+ if not isinstance(_callback, str) or not
is_valid_dotpath(_callback.strip()):
+ raise ImportError(f"`{_callback}` doesn't look like a valid dot
path.")
+
+ stripped_callback = _callback.strip()
+
+ try:
+ # The provided callback is a string which appears to be a valid
dotpath, attempt to import it.
+ callback = import_string(stripped_callback)
+ except ImportError as e:
+ # Logging here instead of failing because it is possible that the
code for the callable
+ # exists somewhere other than on the DAG processor. We are making
a best effort to validate,
+ # but can't rule out that it may be available at runtime even if
it can not be imported here.
+ logger.debug(
+ "Callback %s is formatted like a callable dotpath, but could
not be imported.\n%s",
+ stripped_callback,
+ e,
+ )
+ return stripped_callback
+
+ # If we get this far then the input is a string which can be imported,
check if it is a callable.
+ if not callable(callback):
+ raise AttributeError(f"Provided callback {callback} is not
callable.")
+
+ return stripped_callback
+
+ def serialize_deadline_alert(self):
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ return BaseSerialization.serialize(
+ {
+ "reference": self.reference,
+ "interval": self.interval,
+ "callback": self.callback,
+ "callback_kwargs": self.callback_kwargs,
+ }
+ )
class DeadlineReference:
@@ -58,9 +126,9 @@ class DeadlineReference:
from airflow.models.deadline import ReferenceModels
- DAGRUN_LOGICAL_DATE: ReferenceModels.BaseDeadlineReference =
ReferenceModels.DagRunLogicalDateDeadline()
- DAGRUN_QUEUED_AT: ReferenceModels.BaseDeadlineReference =
ReferenceModels.DagRunQueuedAtDeadline()
+ DAGRUN_LOGICAL_DATE: DeadlineReferenceType =
ReferenceModels.DagRunLogicalDateDeadline()
+ DAGRUN_QUEUED_AT: DeadlineReferenceType =
ReferenceModels.DagRunQueuedAtDeadline()
@classmethod
- def FIXED_DATETIME(cls, datetime: datetime) ->
ReferenceModels.BaseDeadlineReference:
+ def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
diff --git a/task-sdk/tests/task_sdk/definitions/test_deadline.py
b/task-sdk/tests/task_sdk/definitions/test_deadline.py
index 30d9548d64e..b46d212a7fb 100644
--- a/task-sdk/tests/task_sdk/definitions/test_deadline.py
+++ b/task-sdk/tests/task_sdk/definitions/test_deadline.py
@@ -22,10 +22,13 @@ import pytest
from task_sdk.definitions.test_dag import DEFAULT_DATE
from airflow.models.deadline import ReferenceModels
-from airflow.sdk.definitions.deadline import DeadlineReference
+from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference
DAG_ID = "dag_id_1"
+TEST_CALLBACK_PATH = f"{__name__}.test_callback"
+UNIMPORTABLE_DOT_PATH = "valid.but.nonexistent.path"
+
REFERENCE_TYPES = [
pytest.param(DeadlineReference.DAGRUN_LOGICAL_DATE, id="logical_date"),
pytest.param(DeadlineReference.DAGRUN_QUEUED_AT, id="queued_at"),
@@ -33,6 +36,48 @@ REFERENCE_TYPES = [
]
+def test_callback():
+ """An empty Callable to use for the callback tests in this suite."""
+ pass
+
+
+class TestDeadlineAlert:
+ @pytest.mark.parametrize(
+ "callback_value, expected_path",
+ [
+ pytest.param(test_callback, TEST_CALLBACK_PATH,
id="valid_callable"),
+ pytest.param(TEST_CALLBACK_PATH, TEST_CALLBACK_PATH,
id="valid_path_string"),
+ pytest.param(lambda x: x, None, id="lambda_function"),
+ pytest.param(TEST_CALLBACK_PATH + " ", TEST_CALLBACK_PATH,
id="path_with_whitespace"),
+ pytest.param(UNIMPORTABLE_DOT_PATH, UNIMPORTABLE_DOT_PATH,
id="valid_format_not_importable"),
+ ],
+ )
+ def test_get_callback_path_happy_cases(self, callback_value,
expected_path):
+ path = DeadlineAlert.get_callback_path(callback_value)
+ if expected_path is None:
+ assert path.endswith("<lambda>")
+ else:
+ assert path == expected_path
+
+ @pytest.mark.parametrize(
+ "callback_value, error_type",
+ [
+ pytest.param(42, ImportError, id="not_a_string"),
+ pytest.param("", ImportError, id="empty_string"),
+ pytest.param("os.path", AttributeError, id="non_callable_module"),
+ ],
+ )
+ def test_get_callback_path_error_cases(self, callback_value, error_type):
+ expected_message = ""
+ if error_type is ImportError:
+ expected_message = "doesn't look like a valid dot path."
+ elif error_type is AttributeError:
+ expected_message = "is not callable."
+
+ with pytest.raises(error_type, match=expected_message):
+ DeadlineAlert.get_callback_path(callback_value)
+
+
class TestDeadlineReference:
@pytest.mark.parametrize("reference", REFERENCE_TYPES)
def test_deadline_evaluate_with(self, reference):