ferruzzi commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2796084465
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -203,29 +303,36 @@ def register_custom_reference(
def deadline_reference(
deadline_reference_type: DeadlineReferenceTypes | None = None,
-) -> Callable[[type[ReferenceModels.BaseDeadlineReference]],
type[ReferenceModels.BaseDeadlineReference]]:
+) -> Callable[[type[BaseDeadlineReference]], type[BaseDeadlineReference]]:
"""
Decorate a class to register a custom deadline reference.
Usage:
@deadline_reference()
- class MyCustomReference(ReferenceModels.BaseDeadlineReference):
+ class MyCustomReference(BaseDeadlineReference):
# By default, evaluate_with will be called when a new dagrun is
created.
def _evaluate_with(self, *, session: Session, **kwargs) ->
datetime:
- # Put your business logic here
+ # Put your business logic here (use deferred imports for Core
types)
+ from airflow.models import DagRun
return some_datetime
+ def serialize_reference(self) -> dict:
+ return {"reference_type": self.reference_name}
+
@deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
- class MyQueuedRef(ReferenceModels.BaseDeadlineReference):
+ class MyQueuedRef(BaseDeadlineReference):
# Optionally, you can specify when you want it calculated by
providing a DeadlineReference.TYPES
def _evaluate_with(self, *, session: Session, **kwargs) ->
datetime:
# Put your business logic here
Review Comment:
I assume you want to put the same note about imports as you did in the other
example?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]