ferruzzi commented on code in PR #62688:
URL: https://github.com/apache/airflow/pull/62688#discussion_r2956709406
##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -482,6 +478,36 @@ def deserialize_reference(cls, reference_data: dict):
min_runs=min_runs,
)
+ class JobStartDateDeadline(BaseDeadlineReference):
+ """A deadline that returns the start date of the latest active
SchedulerJob."""
+
+ # By setting this to an empty set, we tell the system this reference
+ # doesn't need an 'id' or any other data from the DAG definition.
+ required_kwargs: set[str] = set()
+
+ def _evaluate_with(self, *, session: Session, **kwargs: Any) ->
datetime | None:
+ """Find the start date of the most recently started running
SchedulerJob."""
+ from sqlalchemy import desc
+
+ from airflow.jobs.job import Job
+
+ # We query the job table directly for the newest SchedulerJob that
is still 'running'.
+ # This ensures we are always measuring against the current active
platform engine.
+ stmt = (
+ select(Job.start_date)
+ .where(
+ Job.job_type == "SchedulerJob",
+ Job.state == "running",
+ )
+ .order_by(desc(Job.start_date))
+ .limit(1)
+ )
Review Comment:
I don't know why @amoghrajesh changed that. It is not hardcoded in the
original in
[airflow.models.deadline._fetch_from_db](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/deadline.py#L493)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]