This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 43cd627100d AIP-86 - Move DeadlineAlert into the SDK since it is 
direct user-facing code (#51326)
43cd627100d is described below

commit 43cd627100da2a8dbad0bf55af308a299976d1c8
Author: D. Ferruzzi <[email protected]>
AuthorDate: Fri Jun 6 13:18:42 2025 -0700

    AIP-86 - Move DeadlineAlert into the SDK since it is direct user-facing 
code (#51326)
    
    * AIP-86 - Move DeadlineAlert into the SDK since it is direct user-facing 
code
    
    * Remove caplog from Deadline unit tests per this email thread: 
https://lists.apache.org/thread/f7t5zl6t3t0s89rt37orfcv4966crojt
    
    ---------
    
    Co-authored-by: Niko Oliveira <[email protected]>
---
 airflow-core/src/airflow/models/deadline.py        | 64 +-----------------
 .../airflow/serialization/serialized_objects.py    |  2 +-
 airflow-core/tests/unit/models/test_deadline.py    | 62 ++----------------
 task-sdk/src/airflow/sdk/definitions/dag.py        |  2 +-
 task-sdk/src/airflow/sdk/definitions/deadline.py   | 76 ++++++++++++++++++++--
 .../tests/task_sdk/definitions/test_deadline.py    | 47 ++++++++++++-
 6 files changed, 127 insertions(+), 126 deletions(-)

diff --git a/airflow-core/src/airflow/models/deadline.py 
b/airflow-core/src/airflow/models/deadline.py
index a769ea0b1d3..dfa4bb104c0 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -19,8 +19,8 @@ from __future__ import annotations
 import logging
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
-from datetime import datetime, timedelta
-from typing import TYPE_CHECKING, Any, Callable
+from datetime import datetime
+from typing import TYPE_CHECKING, Any
 
 import sqlalchemy_jsonfield
 import uuid6
@@ -32,14 +32,12 @@ from sqlalchemy_utils import UUIDType
 from airflow.models.base import Base, StringID
 from airflow.settings import json
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.module_loading import import_string, is_valid_dotpath
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
-    from airflow.sdk.definitions.deadline import DeadlineReference
 
 logger = logging.getLogger(__name__)
 
@@ -168,63 +166,7 @@ class ReferenceModels:
             return _fetch_from_db(DagRun.queued_at, **kwargs)
 
 
-class DeadlineAlert:
-    """Store Deadline values needed to calculate the need-by timestamp and the 
callback information."""
-
-    def __init__(
-        self,
-        reference: DeadlineReference,
-        interval: timedelta,
-        callback: Callable | str,
-        callback_kwargs: dict | None = None,
-    ):
-        self.reference = reference
-        self.interval = interval
-        self.callback_kwargs = callback_kwargs
-        self.callback = self.get_callback_path(callback)
-
-    @staticmethod
-    def get_callback_path(_callback: str | Callable) -> str:
-        if callable(_callback):
-            # Get the reference path to the callable in the form 
`airflow.models.deadline.get_from_db`
-            return f"{_callback.__module__}.{_callback.__qualname__}"
-
-        if not isinstance(_callback, str) or not 
is_valid_dotpath(_callback.strip()):
-            raise ImportError(f"`{_callback}` doesn't look like a valid dot 
path.")
-
-        stripped_callback = _callback.strip()
-
-        try:
-            # The provided callback is a string which appears to be a valid 
dotpath, attempt to import it.
-            callback = import_string(stripped_callback)
-        except ImportError as e:
-            # Logging here instead of failing because it is possible that the 
code for the callable
-            # exists somewhere other than on the DAG processor. We are making 
a best effort to validate,
-            # but can't rule out that it may be available at runtime even if 
it can not be imported here.
-            logger.debug(
-                "Callback %s is formatted like a callable dotpath, but could 
not be imported.\n%s",
-                stripped_callback,
-                e,
-            )
-            return stripped_callback
-
-        # If we get this far then the input is a string which can be imported, 
check if it is a callable.
-        if not callable(callback):
-            raise AttributeError(f"Provided callback {callback} is not 
callable.")
-
-        return stripped_callback
-
-    def serialize_deadline_alert(self):
-        from airflow.serialization.serialized_objects import BaseSerialization
-
-        return BaseSerialization.serialize(
-            {
-                "reference": self.reference,
-                "interval": self.interval,
-                "callback": self.callback,
-                "callback_kwargs": self.callback_kwargs,
-            }
-        )
+DeadlineReferenceType = ReferenceModels.BaseDeadlineReference
 
 
 @provide_session
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index ed058ebb090..72962eeb444 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -44,7 +44,6 @@ from airflow.exceptions import AirflowException, 
SerializationError, TaskDeferre
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.connection import Connection
 from airflow.models.dag import DAG, _get_model_data_interval
-from airflow.models.deadline import DeadlineAlert
 from airflow.models.expandinput import (
     create_expand_input,
 )
@@ -66,6 +65,7 @@ from airflow.sdk.definitions.asset import (
     AssetWatcher,
     BaseAsset,
 )
+from airflow.sdk.definitions.deadline import DeadlineAlert
 from airflow.sdk.definitions.mappedoperator import MappedOperator
 from airflow.sdk.definitions.param import Param, ParamsDict
 from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
diff --git a/airflow-core/tests/unit/models/test_deadline.py 
b/airflow-core/tests/unit/models/test_deadline.py
index a5bf6bd8ea8..acc97d78eba 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -17,7 +17,6 @@
 from __future__ import annotations
 
 import json
-import logging
 from datetime import datetime
 from unittest import mock
 
@@ -27,7 +26,7 @@ from sqlalchemy import select
 from sqlalchemy.exc import SQLAlchemyError
 
 from airflow.models import DagRun
-from airflow.models.deadline import Deadline, DeadlineAlert, _fetch_from_db
+from airflow.models.deadline import Deadline, _fetch_from_db
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.deadline import DeadlineReference
 from airflow.utils.state import DagRunState
@@ -40,12 +39,6 @@ RUN_ID = 1
 
 TEST_CALLBACK_KWARGS = {"to": "[email protected]"}
 TEST_CALLBACK_PATH = f"{__name__}.test_callback"
-UNIMPORTABLE_DOT_PATH = "valid.but.nonexistent.path"
-
-
-def test_callback():
-    """An empty Callable to use for the callback tests in this suite."""
-    pass
 
 
 def _clean_db():
@@ -144,50 +137,6 @@ class TestDeadline:
         )
 
 
-class TestDeadlineAlert:
-    @pytest.mark.parametrize(
-        "callback_value, expected_path",
-        [
-            pytest.param(test_callback, TEST_CALLBACK_PATH, 
id="valid_callable"),
-            pytest.param(TEST_CALLBACK_PATH, TEST_CALLBACK_PATH, 
id="valid_path_string"),
-            pytest.param(lambda x: x, None, id="lambda_function"),
-            pytest.param(TEST_CALLBACK_PATH + "  ", TEST_CALLBACK_PATH, 
id="path_with_whitespace"),
-            pytest.param(UNIMPORTABLE_DOT_PATH, UNIMPORTABLE_DOT_PATH, 
id="valid_format_not_importable"),
-        ],
-    )
-    def test_get_callback_path_happy_cases(self, callback_value, 
expected_path):
-        path = DeadlineAlert.get_callback_path(callback_value)
-        if expected_path is None:
-            assert path.endswith("<lambda>")
-        else:
-            assert path == expected_path
-
-    @pytest.mark.parametrize(
-        "callback_value, error_type",
-        [
-            pytest.param(42, ImportError, id="not_a_string"),
-            pytest.param("", ImportError, id="empty_string"),
-            pytest.param("os.path", AttributeError, id="non_callable_module"),
-        ],
-    )
-    def test_get_callback_path_error_cases(self, callback_value, error_type):
-        expected_message = ""
-        if error_type is ImportError:
-            expected_message = "doesn't look like a valid dot path."
-        elif error_type is AttributeError:
-            expected_message = "is not callable."
-
-        with pytest.raises(error_type, match=expected_message):
-            DeadlineAlert.get_callback_path(callback_value)
-
-    def test_log_unimportable_but_properly_formatted_callback(self, caplog):
-        with caplog.at_level(logging.DEBUG):
-            path = DeadlineAlert.get_callback_path(UNIMPORTABLE_DOT_PATH)
-
-            assert "could not be imported" in caplog.text
-            assert path == UNIMPORTABLE_DOT_PATH
-
-
 @pytest.mark.db_test
 class TestCalculatedDeadlineDatabaseCalls:
     @staticmethod
@@ -265,7 +214,7 @@ class TestCalculatedDeadlineDatabaseCalls:
     )
     @mock.patch("sqlalchemy.orm.Session")
     def test_fetch_from_db_error_cases(
-        self, mock_session, use_valid_conditions, scalar_side_effect, 
expected_error, expected_message, caplog
+        self, mock_session, use_valid_conditions, scalar_side_effect, 
expected_error, expected_message
     ):
         """Test database access error handling."""
         model_reference = DagRun.logical_date
@@ -274,11 +223,8 @@ class TestCalculatedDeadlineDatabaseCalls:
         # Configure mock session
         mock_session.scalar.side_effect = scalar_side_effect
 
-        with caplog.at_level(logging.ERROR):
-            with pytest.raises(expected_error, match=expected_message):
-                _fetch_from_db(model_reference, session=mock_session, 
**conditions)
-            if expected_message:
-                assert expected_message in caplog.text
+        with pytest.raises(expected_error, match=expected_message):
+            _fetch_from_db(model_reference, session=mock_session, **conditions)
 
     @pytest.mark.parametrize(
         "reference, expected_column",
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py 
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 87215f5531a..8b25e6a8b39 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -50,13 +50,13 @@ from airflow.exceptions import (
     ParamValidationError,
     TaskNotFound,
 )
-from airflow.models.deadline import DeadlineAlert
 from airflow.sdk.bases.operator import BaseOperator
 from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator
 from airflow.sdk.definitions._internal.node import validate_key
 from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet
 from airflow.sdk.definitions.asset import AssetAll, BaseAsset
 from airflow.sdk.definitions.context import Context
+from airflow.sdk.definitions.deadline import DeadlineAlert
 from airflow.sdk.definitions.param import DagParam, ParamsDict
 from airflow.timetables.base import Timetable
 from airflow.timetables.simple import (
diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py 
b/task-sdk/src/airflow/sdk/definitions/deadline.py
index aa3d643a975..d1206940635 100644
--- a/task-sdk/src/airflow/sdk/definitions/deadline.py
+++ b/task-sdk/src/airflow/sdk/definitions/deadline.py
@@ -16,7 +16,75 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime
+import logging
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Callable
+
+from airflow.utils.module_loading import import_string, is_valid_dotpath
+
+if TYPE_CHECKING:
+    from airflow.models.deadline import DeadlineReferenceType
+
+logger = logging.getLogger(__name__)
+
+
+class DeadlineAlert:
+    """Store Deadline values needed to calculate the need-by timestamp and the 
callback information."""
+
+    def __init__(
+        self,
+        reference: DeadlineReferenceType,
+        interval: timedelta,
+        callback: Callable | str,
+        callback_kwargs: dict | None = None,
+    ):
+        self.reference = reference
+        self.interval = interval
+        self.callback_kwargs = callback_kwargs
+        self.callback = self.get_callback_path(callback)
+
+    @staticmethod
+    def get_callback_path(_callback: str | Callable) -> str:
+        if callable(_callback):
+            # Get the reference path to the callable in the form 
`airflow.models.deadline.get_from_db`
+            return f"{_callback.__module__}.{_callback.__qualname__}"
+
+        if not isinstance(_callback, str) or not 
is_valid_dotpath(_callback.strip()):
+            raise ImportError(f"`{_callback}` doesn't look like a valid dot 
path.")
+
+        stripped_callback = _callback.strip()
+
+        try:
+            # The provided callback is a string which appears to be a valid 
dotpath, attempt to import it.
+            callback = import_string(stripped_callback)
+        except ImportError as e:
+            # Logging here instead of failing because it is possible that the 
code for the callable
+            # exists somewhere other than on the DAG processor. We are making 
a best effort to validate,
+            # but can't rule out that it may be available at runtime even if 
it can not be imported here.
+            logger.debug(
+                "Callback %s is formatted like a callable dotpath, but could 
not be imported.\n%s",
+                stripped_callback,
+                e,
+            )
+            return stripped_callback
+
+        # If we get this far then the input is a string which can be imported, 
check if it is a callable.
+        if not callable(callback):
+            raise AttributeError(f"Provided callback {callback} is not 
callable.")
+
+        return stripped_callback
+
+    def serialize_deadline_alert(self):
+        from airflow.serialization.serialized_objects import BaseSerialization
+
+        return BaseSerialization.serialize(
+            {
+                "reference": self.reference,
+                "interval": self.interval,
+                "callback": self.callback,
+                "callback_kwargs": self.callback_kwargs,
+            }
+        )
 
 
 class DeadlineReference:
@@ -58,9 +126,9 @@ class DeadlineReference:
 
     from airflow.models.deadline import ReferenceModels
 
-    DAGRUN_LOGICAL_DATE: ReferenceModels.BaseDeadlineReference = 
ReferenceModels.DagRunLogicalDateDeadline()
-    DAGRUN_QUEUED_AT: ReferenceModels.BaseDeadlineReference = 
ReferenceModels.DagRunQueuedAtDeadline()
+    DAGRUN_LOGICAL_DATE: DeadlineReferenceType = 
ReferenceModels.DagRunLogicalDateDeadline()
+    DAGRUN_QUEUED_AT: DeadlineReferenceType = 
ReferenceModels.DagRunQueuedAtDeadline()
 
     @classmethod
-    def FIXED_DATETIME(cls, datetime: datetime) -> 
ReferenceModels.BaseDeadlineReference:
+    def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
         return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
diff --git a/task-sdk/tests/task_sdk/definitions/test_deadline.py 
b/task-sdk/tests/task_sdk/definitions/test_deadline.py
index 30d9548d64e..b46d212a7fb 100644
--- a/task-sdk/tests/task_sdk/definitions/test_deadline.py
+++ b/task-sdk/tests/task_sdk/definitions/test_deadline.py
@@ -22,10 +22,13 @@ import pytest
 from task_sdk.definitions.test_dag import DEFAULT_DATE
 
 from airflow.models.deadline import ReferenceModels
-from airflow.sdk.definitions.deadline import DeadlineReference
+from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference
 
 DAG_ID = "dag_id_1"
 
+TEST_CALLBACK_PATH = f"{__name__}.test_callback"
+UNIMPORTABLE_DOT_PATH = "valid.but.nonexistent.path"
+
 REFERENCE_TYPES = [
     pytest.param(DeadlineReference.DAGRUN_LOGICAL_DATE, id="logical_date"),
     pytest.param(DeadlineReference.DAGRUN_QUEUED_AT, id="queued_at"),
@@ -33,6 +36,48 @@ REFERENCE_TYPES = [
 ]
 
 
+def test_callback():
+    """An empty Callable to use for the callback tests in this suite."""
+    pass
+
+
+class TestDeadlineAlert:
+    @pytest.mark.parametrize(
+        "callback_value, expected_path",
+        [
+            pytest.param(test_callback, TEST_CALLBACK_PATH, 
id="valid_callable"),
+            pytest.param(TEST_CALLBACK_PATH, TEST_CALLBACK_PATH, 
id="valid_path_string"),
+            pytest.param(lambda x: x, None, id="lambda_function"),
+            pytest.param(TEST_CALLBACK_PATH + "  ", TEST_CALLBACK_PATH, 
id="path_with_whitespace"),
+            pytest.param(UNIMPORTABLE_DOT_PATH, UNIMPORTABLE_DOT_PATH, 
id="valid_format_not_importable"),
+        ],
+    )
+    def test_get_callback_path_happy_cases(self, callback_value, 
expected_path):
+        path = DeadlineAlert.get_callback_path(callback_value)
+        if expected_path is None:
+            assert path.endswith("<lambda>")
+        else:
+            assert path == expected_path
+
+    @pytest.mark.parametrize(
+        "callback_value, error_type",
+        [
+            pytest.param(42, ImportError, id="not_a_string"),
+            pytest.param("", ImportError, id="empty_string"),
+            pytest.param("os.path", AttributeError, id="non_callable_module"),
+        ],
+    )
+    def test_get_callback_path_error_cases(self, callback_value, error_type):
+        expected_message = ""
+        if error_type is ImportError:
+            expected_message = "doesn't look like a valid dot path."
+        elif error_type is AttributeError:
+            expected_message = "is not callable."
+
+        with pytest.raises(error_type, match=expected_message):
+            DeadlineAlert.get_callback_path(callback_value)
+
+
 class TestDeadlineReference:
     @pytest.mark.parametrize("reference", REFERENCE_TYPES)
     def test_deadline_evaluate_with(self, reference):

Reply via email to