ferruzzi commented on code in PR #55088:
URL: https://github.com/apache/airflow/pull/55088#discussion_r2350292436


##########
airflow-core/src/airflow/models/deadline.py:
##########
@@ -366,6 +366,62 @@ def _evaluate_with(self, *, session: Session, **kwargs: 
Any) -> datetime:
 
             return _fetch_from_db(DagRun.queued_at, session=session, **kwargs)
 
+    @dataclass
+    class AverageRuntimeDeadline(BaseDeadlineReference):
+        """A deadline that calculates the average runtime from past DAG 
runs."""
+
+        DEFAULT_LIMIT = 10
+        limit: int
+        required_kwargs = {"dag_id"}
+
+        @provide_session
+        def _evaluate_with(self, *, session: Session, **kwargs: Any) -> 
datetime:
+            from airflow.models import DagRun
+
+            dag_id = kwargs["dag_id"]
+
+            # Query for completed DAG runs with both start and end dates
+            # Order by logical_date descending to get most recent runs first
+            query = (
+                select(func.extract("epoch", DagRun.end_date - 
DagRun.start_date))
+                .filter(DagRun.dag_id == dag_id, 
DagRun.start_date.isnot(None), DagRun.end_date.isnot(None))
+                .order_by(DagRun.logical_date.desc())
+            )
+
+            # Apply limit
+            query = query.limit(self.limit)
+
+            # Get all durations and calculate average
+            durations = session.execute(query).scalars().all()
+
+            if len(durations) < self.limit:
+                logger.warning(
+                    "In the AverageRuntimeDeadline: Only %d completed DAG runs 
found for dag_id: %s (need %d), using 48 hour default",
+                    len(durations),
+                    dag_id,
+                    self.limit,
+                )
+                avg_seconds = 48 * 3600  # 48 hours as default

Review Comment:
   Had a chat with Sean offline, he's going to rename "limit" to be a 
max-number-of-records and add an optional min-number-of-records which defaults 
to the same value, so the logic will be:
   
   - Query the db for up to `max_records` number of historical durations
   - If the resulting list is less than `min_records` then return None (no 
deadline set); else use the average of what you have.
   
   The min will default to the max unless the user overrides it.  So in 
practice:
   
   | min_records | max_records | Result                                         
                                                                                
         |
   
|-------------|-------------|-----------------------------------------------------------------------------------------------------------------------------------------|
   | not set     | 10          | No failures until run 11 which uses the 
average of the 10 previous                                                      
                 |
   | 10          | 10          | No failures until run 11 which uses the 
average of the 10 previous                                                      
                 |
   | 1           | 10          | First run can not fail, second run fails if it 
runs longer than the first, third run averages the previous two, etc up to 10   
                                                            |
   | 5           | 10          | First 5 can not fail, run 6 uses the average 
of the 5 previous values, run 7 - 10 build up that average, run 11 uses the 
10-run average |



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to