amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870589608
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -118,69 +223,64 @@ class TYPES:
# Deadlines that should be created when the DagRun is created.
DAGRUN_CREATED: DeadlineReferenceTypes = (
- ReferenceModels.DagRunLogicalDateDeadline,
- ReferenceModels.FixedDatetimeDeadline,
- ReferenceModels.AverageRuntimeDeadline,
+ DagRunLogicalDateDeadline,
+ FixedDatetimeDeadline,
+ AverageRuntimeDeadline,
)
# Deadlines that should be created when the DagRun is queued.
- DAGRUN_QUEUED: DeadlineReferenceTypes =
(ReferenceModels.DagRunQueuedAtDeadline,)
+ DAGRUN_QUEUED: DeadlineReferenceTypes = (DagRunQueuedAtDeadline,)
# All DagRun-related deadline types.
DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
- from airflow.models.deadline import ReferenceModels
-
- DAGRUN_LOGICAL_DATE: DeadlineReferenceType =
ReferenceModels.DagRunLogicalDateDeadline()
- DAGRUN_QUEUED_AT: DeadlineReferenceType =
ReferenceModels.DagRunQueuedAtDeadline()
+ DAGRUN_LOGICAL_DATE: DeadlineReferenceType = DagRunLogicalDateDeadline()
+ DAGRUN_QUEUED_AT: DeadlineReferenceType = DagRunQueuedAtDeadline()
@classmethod
def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None)
-> DeadlineReferenceType:
if max_runs == 0:
- max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
+ max_runs = AverageRuntimeDeadline.DEFAULT_LIMIT
if min_runs is None:
min_runs = max_runs
- return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
+ return AverageRuntimeDeadline(max_runs, min_runs)
@classmethod
- def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
- return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
+ def FIXED_DATETIME(cls, dt: datetime) -> DeadlineReferenceType:
+ return FixedDatetimeDeadline(dt)
# TODO: Remove this once other deadline types exist.
# This is a temporary reference type used only in tests to verify that
# dag.has_dagrun_deadline() returns false if the dag has a non-dagrun
deadline type.
# It should be replaced with a real non-dagrun deadline type when one is
available.
_TEMPORARY_TEST_REFERENCE = type(
"TemporaryTestDeadlineForTypeChecking",
- (DeadlineReferenceType,),
- {"_evaluate_with": lambda self, **kwargs: datetime.now()},
+ (BaseDeadlineReference,),
+ {"serialize_reference": lambda self: {REFERENCE_TYPE_FIELD:
"TemporaryTestDeadlineForTypeChecking"}},
)()
@classmethod
def register_custom_reference(
cls,
- reference_class: type[ReferenceModels.BaseDeadlineReference],
+ reference_class: type[BaseDeadlineReference],
deadline_reference_type: DeadlineReferenceTypes | None = None,
- ) -> type[ReferenceModels.BaseDeadlineReference]:
+ ) -> type[BaseDeadlineReference]:
"""
Register a custom deadline reference class.
:param reference_class: The custom reference class inheriting from
BaseDeadlineReference
:param deadline_reference_type: A DeadlineReference.TYPES for when the
deadline should be evaluated ("DAGRUN_CREATED",
"DAGRUN_QUEUED", etc.); defaults to
DeadlineReference.TYPES.DAGRUN_CREATED
"""
- from airflow.models.deadline import ReferenceModels
-
# Default to DAGRUN_CREATED if no deadline_reference_type specified
if deadline_reference_type is None:
deadline_reference_type = cls.TYPES.DAGRUN_CREATED
# Validate the reference class inherits from BaseDeadlineReference
- if not issubclass(reference_class,
ReferenceModels.BaseDeadlineReference):
+ if not issubclass(reference_class, BaseDeadlineReference):
Review Comment:
Good call! It might break for such users, let me add a compat shim, I was
under the impression that since this is an experimental feature, we should be
OK to do it, but maybe thats not a great idea!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]