amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870619189
##########
airflow-core/src/airflow/serialization/definitions/deadline.py:
##########
@@ -239,6 +250,52 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
return cls(max_runs=reference_data["max_runs"],
min_runs=reference_data.get("min_runs"))
+ class SerializedCustomReference(SerializedBaseDeadlineReference):
+ """Wrapper for custom deadline references."""
+
+ def __init__(self, inner_ref):
+ self.inner_ref = inner_ref
+
+ @property
+ def reference_name(self) -> str:
+ return self.inner_ref.reference_name
+
+ def evaluate_with(self, *, session: Session, interval: timedelta,
**kwargs: Any) -> datetime | None:
+ filtered_kwargs = {k: v for k, v in kwargs.items() if k in
self.required_kwargs}
Review Comment:
Good catch, handled in [comments from
kaxil](https://github.com/apache/airflow/pull/61461/commits/f78d6235c6400934175912142e29eafaf59421a9)
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -118,69 +223,64 @@ class TYPES:
# Deadlines that should be created when the DagRun is created.
DAGRUN_CREATED: DeadlineReferenceTypes = (
- ReferenceModels.DagRunLogicalDateDeadline,
- ReferenceModels.FixedDatetimeDeadline,
- ReferenceModels.AverageRuntimeDeadline,
+ DagRunLogicalDateDeadline,
+ FixedDatetimeDeadline,
+ AverageRuntimeDeadline,
)
# Deadlines that should be created when the DagRun is queued.
- DAGRUN_QUEUED: DeadlineReferenceTypes =
(ReferenceModels.DagRunQueuedAtDeadline,)
+ DAGRUN_QUEUED: DeadlineReferenceTypes = (DagRunQueuedAtDeadline,)
# All DagRun-related deadline types.
DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
- from airflow.models.deadline import ReferenceModels
-
- DAGRUN_LOGICAL_DATE: DeadlineReferenceType =
ReferenceModels.DagRunLogicalDateDeadline()
- DAGRUN_QUEUED_AT: DeadlineReferenceType =
ReferenceModels.DagRunQueuedAtDeadline()
+ DAGRUN_LOGICAL_DATE: DeadlineReferenceType = DagRunLogicalDateDeadline()
+ DAGRUN_QUEUED_AT: DeadlineReferenceType = DagRunQueuedAtDeadline()
@classmethod
def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None)
-> DeadlineReferenceType:
if max_runs == 0:
- max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
+ max_runs = AverageRuntimeDeadline.DEFAULT_LIMIT
if min_runs is None:
min_runs = max_runs
- return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
+ return AverageRuntimeDeadline(max_runs, min_runs)
@classmethod
- def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
- return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
+ def FIXED_DATETIME(cls, dt: datetime) -> DeadlineReferenceType:
+ return FixedDatetimeDeadline(dt)
# TODO: Remove this once other deadline types exist.
# This is a temporary reference type used only in tests to verify that
# dag.has_dagrun_deadline() returns false if the dag has a non-dagrun
deadline type.
# It should be replaced with a real non-dagrun deadline type when one is
available.
_TEMPORARY_TEST_REFERENCE = type(
"TemporaryTestDeadlineForTypeChecking",
- (DeadlineReferenceType,),
- {"_evaluate_with": lambda self, **kwargs: datetime.now()},
+ (BaseDeadlineReference,),
+ {"serialize_reference": lambda self: {REFERENCE_TYPE_FIELD:
"TemporaryTestDeadlineForTypeChecking"}},
)()
@classmethod
def register_custom_reference(
cls,
- reference_class: type[ReferenceModels.BaseDeadlineReference],
+ reference_class: type[BaseDeadlineReference],
deadline_reference_type: DeadlineReferenceTypes | None = None,
- ) -> type[ReferenceModels.BaseDeadlineReference]:
+ ) -> type[BaseDeadlineReference]:
"""
Register a custom deadline reference class.
:param reference_class: The custom reference class inheriting from
BaseDeadlineReference
:param deadline_reference_type: A DeadlineReference.TYPES for when the
deadline should be evaluated ("DAGRUN_CREATED",
"DAGRUN_QUEUED", etc.); defaults to
DeadlineReference.TYPES.DAGRUN_CREATED
"""
- from airflow.models.deadline import ReferenceModels
-
# Default to DAGRUN_CREATED if no deadline_reference_type specified
if deadline_reference_type is None:
deadline_reference_type = cls.TYPES.DAGRUN_CREATED
# Validate the reference class inherits from BaseDeadlineReference
- if not issubclass(reference_class,
ReferenceModels.BaseDeadlineReference):
+ if not issubclass(reference_class, BaseDeadlineReference):
Review Comment:
Handled in [comments from
kaxil](https://github.com/apache/airflow/pull/61461/commits/8938e471fec295bb04468c66150923730dd2d40b)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]