o-nikolas commented on code in PR #55088:
URL: https://github.com/apache/airflow/pull/55088#discussion_r2350216427
##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -366,6 +366,62 @@ def _evaluate_with(self, *, session: Session, **kwargs:
Any) -> datetime:
return _fetch_from_db(DagRun.queued_at, session=session, **kwargs)
+ @dataclass
+ class AverageRuntimeDeadline(BaseDeadlineReference):
+ """A deadline that calculates the average runtime from past DAG
runs."""
+
+ DEFAULT_LIMIT = 10
+ limit: int
+ required_kwargs = {"dag_id"}
+
+ @provide_session
+ def _evaluate_with(self, *, session: Session, **kwargs: Any) ->
datetime:
+ from airflow.models import DagRun
+
+ dag_id = kwargs["dag_id"]
+
+ # Query for completed DAG runs with both start and end dates
+ # Order by logical_date descending to get most recent runs first
+ query = (
+ select(func.extract("epoch", DagRun.end_date -
DagRun.start_date))
+ .filter(DagRun.dag_id == dag_id,
DagRun.start_date.isnot(None), DagRun.end_date.isnot(None))
+ .order_by(DagRun.logical_date.desc())
+ )
+
+ # Apply limit
+ query = query.limit(self.limit)
+
+ # Get all durations and calculate average
+ durations = session.execute(query).scalars().all()
+
+ if len(durations) < self.limit:
+ logger.warning(
+ "In the AverageRuntimeDeadline: Only %d completed DAG runs
found for dag_id: %s (need %d), using 48 hour default",
+ len(durations),
+ dag_id,
+ self.limit,
+ )
+ avg_seconds = 48 * 3600 # 48 hours as default
Review Comment:
I still think we should not make up a default deadline that will likely not
work for any users. Is it not possible to just disable the Deadline if we don't
have enough data points? Return None or something like that and the rest of the
deadline logic would know to discard this deadline?
I'm not going to die on this hill if you and @ferruzzi prefer it this way
though.
##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -371,11 +373,25 @@ def test_deadline_database_integration(self, reference,
expected_column, session
if expected_column is not None:
result = reference.evaluate_with(session=session,
interval=interval, **conditions)
mock_fetch.assert_called_once_with(expected_column,
session=session, **conditions)
+ elif reference == DeadlineReference.AVERAGE_RUNTIME:
+ with mock.patch("airflow._shared.timezones.timezone.utcnow")
as mock_utcnow:
+ mock_utcnow.return_value = DEFAULT_DATE
+ with mock.patch.object(session, "execute") as mock_execute:
Review Comment:
What was the resolution on this? I see it was resolved but I don't see any
replies here and the code seems to still be mocking session.execute?
##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -375,11 +377,25 @@ def test_deadline_database_integration(self, reference,
expected_column, session
if expected_column is not None:
result = reference.evaluate_with(session=session,
interval=interval, **conditions)
mock_fetch.assert_called_once_with(expected_column,
session=session, **conditions)
+ elif reference == DeadlineReference.AVERAGE_RUNTIME():
+ with mock.patch("airflow._shared.timezones.timezone.utcnow")
as mock_utcnow:
+ mock_utcnow.return_value = DEFAULT_DATE
+ with mock.patch.object(session, "execute") as mock_execute:
+ # Mock the result object that execute() returns
+ mock_result = mock.Mock()
+ mock_scalars = mock.Mock()
+ mock_scalars.all.return_value = [3600]
+ mock_result.scalars.return_value = mock_scalars
+ mock_execute.return_value = mock_result
+ result = reference.evaluate_with(session=session,
interval=interval, dag_id=DAG_ID)
+ mock_fetch.assert_not_called()
+ # Should be DEFAULT_DATE + default average deadline of
48 hours + the 1 hour mocked execution time
Review Comment:
This is testing the path of the default, but where is the test for the path
where we have enough history to compute an average?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]