ferruzzi commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2915298945


##########
airflow-core/src/airflow/models/deadline_alert.py:
##########
@@ -86,11 +86,10 @@ def matches_definition(self, other: DeadlineAlert) -> bool:
     @property
     def reference_class(self) -> 
type[SerializedReferenceModels.SerializedBaseDeadlineReference]:
         """Return the deserialized reference class."""
-        if "__class_path" in self.reference:
-            return SerializedReferenceModels.SerializedCustomReference
-        return SerializedReferenceModels.get_reference_class(
-            self.reference[SerializedReferenceModels.REFERENCE_TYPE_FIELD]
-        )
+        ref_name = 
self.reference.get(SerializedReferenceModels.REFERENCE_TYPE_FIELD)
+        if ref_name and 
SerializedReferenceModels.is_builtin_reference(ref_name):
+            return SerializedReferenceModels.get_reference_class(ref_name)
+        return SerializedReferenceModels.SerializedCustomReference

Review Comment:
   Yeah, if nothing else, it's certainly easier to read this way.



##########
airflow-core/src/airflow/serialization/encoders.py:
##########
@@ -201,19 +201,43 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
     from airflow.sdk.serde import serialize
 
     return {
-        "reference": d.reference.serialize_reference(),
+        "reference": encode_deadline_reference(d.reference),
         "interval": d.interval.total_seconds(),
         "callback": serialize(d.callback),
     }
 
 
+_BUILTIN_DEADLINE_MODULES = (
+    "airflow.sdk.definitions.deadline",
+    "airflow.serialization.definitions.deadline",
+    # Include airflow.models.deadline to treat core's deadline references as 
builtins.
+    # This is to maintain backcompat with 3.1.x custom refs that inherit from
+    # airflow.models.deadline.ReferenceModels.BaseDeadlineReference.
+    "airflow.models.deadline",
+)
+
+
 def encode_deadline_reference(ref) -> dict[str, Any]:
     """
     Encode a deadline reference.
 
+    For custom (non-builtin) deadline references, includes the class path
+    so the decoder can import the user's class at runtime.
+
     :meta private:
     """
-    return ref.serialize_reference()
+    from airflow._shared.module_loading import qualname
+
+    serialized = ref.serialize_reference()
+
+    # Custom types (not built-in) need __class_path so the decoder can import 
them.
+    # Unlike built-in types which are looked up in SerializedReferenceModels,
+    # custom types are discovered via import_string(__class_path) at 
deserialization time.
+    module = type(ref).__module__
+    if module not in _BUILTIN_DEADLINE_MODULES:

Review Comment:
   Do we want to use the new helper here?
   
   
   ```suggestion
       if not 
SerializedReferenceModels.is_builtin_reference(type(ref).__name__):
   ```



##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########


Review Comment:
   Non-blocking, maybe future work:   In hindsight, `get_reference_class` is 
already doing most of what the new `is_builtin_reference` is doing, in a 
way....  I bet we could combine them, but that might be a breaking change.
   
   If `get_reference_class` returned None instead of an exception, then I think 
`is_builtin_reference` just becomes `return cls.get_reference_class(name) is 
not None` because if it was a built-in, this would return the name.  Or leave 
this raising an exception and use try/catch in `is_builtin_reference`... but 
maybe not... Something to consider for later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to