kaxil commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877244580


##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
         def deserialize_reference(cls, reference_data: dict):
             return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+    class SerializedCustomReference(SerializedBaseDeadlineReference):
+        """
+        Wrapper for custom deadline references.
+
+        This class dynamically delegates to the wrapped reference for 
required_kwargs and evaluation logic.
+        """
+
+        def __init__(self, inner_ref):
+            self.inner_ref = inner_ref
+
+        @property
+        def reference_name(self) -> str:
+            return self.inner_ref.reference_name
+
+        def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+            """Pass through all kwargs to inner reference without filtering."""
+            deadline = self.inner_ref._evaluate_with(session=session, **kwargs)

Review Comment:
   I think this bypasses the base `evaluate_with()` validation/filtering path. 
`SerializedBaseDeadlineReference.evaluate_with()` validates `required_kwargs` 
and drops extras before calling `_evaluate_with()`, but here we call 
`inner_ref._evaluate_with()` directly with all kwargs. That can break custom 
refs with strict signatures (or missing keys) at runtime. Could we mirror the 
base filtering/validation logic here and then delegate with filtered kwargs?



##########
airflow-core/src/airflow/serialization/encoders.py:
##########
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
     from airflow.sdk.serde import serialize
 
     return {
-        "reference": d.reference.serialize_reference(),
+        "reference": encode_deadline_reference(d.reference),
         "interval": d.interval.total_seconds(),
         "callback": serialize(d.callback),
     }
 
 
+_BUILTIN_DEADLINE_MODULES = (
+    "airflow.sdk.definitions.deadline",
+    "airflow.serialization.definitions.deadline",
+)
+
+
 def encode_deadline_reference(ref) -> dict[str, Any]:
     """
     Encode a deadline reference.
 
+    For custom (non-builtin) deadline references, includes the class path
+    so the decoder can import the user's class at runtime.
+
     :meta private:
     """
-    return ref.serialize_reference()
+    from airflow._shared.module_loading import qualname
+
+    serialized = ref.serialize_reference()
+
+    # Custom types (not built-in) need __class_path so the decoder can import 
them.
+    # Unlike built-in types which are looked up in SerializedReferenceModels,
+    # custom types are discovered via import_string(__class_path) at 
deserialization time.
+    module = type(ref).__module__
+    if module not in _BUILTIN_DEADLINE_MODULES:

Review Comment:
   Potential compatibility risk here: refs from `airflow.models.deadline` are 
no longer treated as builtins, so they now rely on `__class_path` import during 
decode. If that path resolves to nested classes (e.g. under `ReferenceModels`), 
`import_string()` currently supports only top-level attributes, which can fail 
deserialization. Should we either include `airflow.models.deadline` in builtins 
or make import resolution nested-class aware?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to