ferruzzi commented on code in PR #53951:
URL: https://github.com/apache/airflow/pull/53951#discussion_r2255418430
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -134,11 +136,96 @@ def deserialize_deadline_alert(cls, encoded_data: dict)
-> DeadlineAlert:
return cls(
reference=reference,
interval=timedelta(seconds=data[DeadlineAlertFields.INTERVAL]),
- callback=data[DeadlineAlertFields.CALLBACK], # Keep as string path
- callback_kwargs=data[DeadlineAlertFields.CALLBACK_KWARGS],
+ callback=cast("Callback",
deserialize(data[DeadlineAlertFields.CALLBACK])),
)
+class Callback(ABC):
+ """
+ Base class for deadline alert callbacks.
Review Comment:
```suggestion
Base class for Deadline Alert callbacks.
```
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -134,11 +136,96 @@ def deserialize_deadline_alert(cls, encoded_data: dict)
-> DeadlineAlert:
return cls(
reference=reference,
interval=timedelta(seconds=data[DeadlineAlertFields.INTERVAL]),
- callback=data[DeadlineAlertFields.CALLBACK], # Keep as string path
- callback_kwargs=data[DeadlineAlertFields.CALLBACK_KWARGS],
+ callback=cast("Callback",
deserialize(data[DeadlineAlertFields.CALLBACK])),
)
+class Callback(ABC):
Review Comment:
Should this be DeadlineCallback or similar if we're not using as a metaclass
for all callbacks?
##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -193,18 +195,30 @@ def prune_deadlines(cls, *, session: Session, conditions:
dict[Column, Any]) ->
return deleted_count
+ @cached_property
+ def callback(self) -> Callback:
+ return cast("Callback", deserialize(self._callback))
+
def handle_miss(self, session: Session):
- """Handle a missed deadline by creating a trigger to run the
callback."""
- # TODO: check to see if the callback is meant to run in triggerer or
executor. For now, the code below assumes it's for the triggerer
- callback_trigger = DeadlineCallbackTrigger(
- callback_path=self.callback,
- callback_kwargs=self.callback_kwargs,
- )
+ """Handle a missed deadline by running the callback in the appropriate
host and updating the `callback_state`."""
+ from airflow.sdk.definitions.deadline import AsyncCallback,
SyncCallback
+
+ if isinstance(self.callback, AsyncCallback):
+ callback_trigger = DeadlineCallbackTrigger(
+ callback_path=self.callback.path,
+ callback_kwargs=self.callback.kwargs,
+ )
+ trigger_orm = Trigger.from_object(callback_trigger)
+ session.add(trigger_orm)
+ session.flush()
+ self.trigger = trigger_orm
Review Comment:
Previously this was `self.trigger_id = trigger_orm.id`, is this change
intentional?
##########
airflow-core/src/airflow/triggers/deadline.py:
##########
@@ -40,8 +40,8 @@ def __init__(self, callback_path: str, callback_kwargs:
dict[str, Any] | None =
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
- f"{type(self).__module__}.{type(self).__qualname__}",
- {"callback_path": self.callback_path, "callback_kwargs":
self.callback_kwargs},
+ qualname(self),
Review Comment:
Nice find!
##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -90,8 +91,7 @@ def test_add_deadline(self, dagrun, session):
assert session.query(Deadline).count() == 0
deadline_orm = Deadline(
deadline_time=DEFAULT_DATE,
- callback=TEST_CALLBACK_PATH,
- callback_kwargs=TEST_CALLBACK_KWARGS,
+ callback=TEST_ASYNC_CALLBACK,
dag_id=DAG_ID,
dagrun_id=dagrun.id,
)
Review Comment:
This looks nice in use. Good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]