ferruzzi commented on code in PR #55088:
URL: https://github.com/apache/airflow/pull/55088#discussion_r2356079332
##########
airflow-core/tests/unit/models/test_deadline.py:
##########
@@ -375,11 +383,145 @@ def test_deadline_database_integration(self, reference,
expected_column, session
if expected_column is not None:
result = reference.evaluate_with(session=session,
interval=interval, **conditions)
mock_fetch.assert_called_once_with(expected_column,
session=session, **conditions)
+ elif reference == DeadlineReference.AVERAGE_RUNTIME():
+ with mock.patch("airflow._shared.timezones.timezone.utcnow")
as mock_utcnow:
+ mock_utcnow.return_value = DEFAULT_DATE
+ # No DAG runs exist, so it should use 24-hour default
+ result = reference.evaluate_with(session=session,
interval=interval, dag_id=DAG_ID)
+ mock_fetch.assert_not_called()
+ # Should return None when no DAG runs exist
+ assert result is None
else:
result = reference.evaluate_with(session=session,
interval=interval)
mock_fetch.assert_not_called()
+ assert result == DEFAULT_DATE + interval
+
+ @pytest.mark.usefixtures("freeze_time")
+ def test_average_runtime_with_sufficient_history(self, session, dag_maker):
+ """Test AverageRuntimeDeadline when enough historical data exists."""
+ with dag_maker(DAG_ID):
+ EmptyOperator(task_id="test_task")
+
+ # Create 10 completed DAG runs with known durations
+ base_time = DEFAULT_DATE
+ durations = [3600, 7200, 1800, 5400, 2700, 4500, 3300, 6000, 2400,
4200]
+
+ for i, duration in enumerate(durations):
+ logical_date = base_time + timedelta(days=i)
+ start_time = logical_date + timedelta(minutes=5)
+ end_time = start_time + timedelta(seconds=duration)
+
+ dagrun = dag_maker.create_dagrun(
+ logical_date=logical_date, run_id=f"test_run_{i}",
state=DagRunState.SUCCESS
+ )
+ # Manually set start and end times
+ dagrun.start_date = start_time
+ dagrun.end_date = end_time
+
+ session.commit()
+
+ # Test with default limit (10)
+ reference = DeadlineReference.AVERAGE_RUNTIME()
+ interval = timedelta(hours=1)
+
+ with mock.patch("airflow._shared.timezones.timezone.utcnow") as
mock_utcnow:
+ mock_utcnow.return_value = DEFAULT_DATE
+ result = reference.evaluate_with(session=session,
interval=interval, dag_id=DAG_ID)
+
+ # Calculate expected average: sum(durations) / len(durations)
+ expected_avg_seconds = sum(durations) / len(durations)
+ expected = DEFAULT_DATE + timedelta(seconds=expected_avg_seconds)
+ interval
+
+ # Compare only up to minutes to avoid sub-second timing issues in
CI
+ assert result.replace(second=0, microsecond=0) ==
expected.replace(second=0, microsecond=0)
Review Comment:
If you are doing this, do you still need the freeze_time fixture? Seems
like you should only need to do one or the other.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]