Tegh25 commented on code in PR #68359:
URL: https://github.com/apache/airflow/pull/68359#discussion_r3440437542


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session = 
NEW_SESSION) -> None:
         for b in backfills:
             b.completed_at = now
 
-    def _create_dag_runs(self, dag_models: Collection[DagModel], session: 
Session) -> None:
-        """Create a DAG run and update the dag_model to control if/when the 
next DAGRun should be created."""
+    def _collect_skipped_intervals(
+        self,
+        serdag: SerializedDAG,
+        new_data_interval: DataInterval,
+        session: Session,
+    ) -> list[tuple[DateTime, DateTime]]:
+        """
+        Return a list of (start, end) tuples for intervals skipped due to 
catchup=False.
+
+        Computes the intervals that would have been scheduled between the 
previous
+        automated DagRun's data_interval_end and the new run's 
data_interval_start,
+        had catchup been True.  Returns an empty list when there is no gap or 
when
+        no previous run exists.
+        """
+        if serdag.catchup:
+            return []
+        listener_has_impls = bool(
+            get_listener_manager().hook.on_intervals_skipped.get_hookimpls()  
# type: ignore[attr-defined]
+        )
+        if not serdag.has_on_skipped_intervals_callback and not 
listener_has_impls:
+            return []
+
+        prev_run = session.scalar(
+            select(DagRun)
+            .where(
+                DagRun.dag_id == serdag.dag_id,
+                DagRun.run_type.in_([DagRunType.SCHEDULED]),
+                DagRun.data_interval_end.is_not(None),
+                DagRun.data_interval_end <= new_data_interval.start,
+            )
+            .order_by(DagRun.data_interval_end.desc())
+            .limit(1)
+        )
+        if prev_run is None or prev_run.data_interval_end is None:
+            return []
+
+        prev_end = prev_run.data_interval_end
+        new_start = new_data_interval.start
+        if prev_end >= new_start:
+            return []
+
+        skipped: list[tuple[DateTime, DateTime]] = []
+        for info in serdag.iter_dagrun_infos_between(prev_end, new_start):

Review Comment:
   @ferruzzi #66791 proposed returning a full list of skipped (start, end) 
intervals. That would require calling `iter_dagrun_infos_between()` on the 
scheduler path, which is O(n) in time, and returning an O(n) list in the 
callback/listener payload. I'm thinking that the best solution is to no longer 
call `iter_dagrun_infos_between()` and not return every skipped interval.
   
   ## Solution
   
   The latest commit (as of writing 
[968793d](https://github.com/apache/airflow/pull/68359/commits/968793db2e18af4364acc4131c5e0da2861e288d))
 changes the callback/listener so that it returns a count of all the intervals 
`skipped_interval_count` and time span between the previous automated run and 
the new run `skipped_range`, achieving O(1) time/memory for both issues. Since 
Dags can be scheduled with a cron expression or `timedelta` object, this 
implementation adds a `count_skipped_intervals_between()` method to the mixins:
   - `airflow-core/src/airflow/timetables/_delta.py`
   - `airflow-core/src/airflow/timetables/_cron.py`
   This calculates the number of skipped intervals depending on how the Dag was 
scheduled.
   
   The limitation with this is that if the user wants to implement a custom 
Timetable for scheduling, they will also need to write their own 
`count_skipped_intervals_between()` if they want to use the skipped intervals 
callback/listener. Also, the cron method can handle uniform (e.g. hourly), 
monthly, and weekly periods. Anything else (e.g. complex DOM+DOW combinations, 
*/2 hours with DST edge cases that break uniform-period detection) raises 
`AirflowTimetableInvalid` in the current implementation. We may also need to 
refine how the `AirflowTimetableInvalid` should be caught.
   
   ## Alternatives
   
   1. We can implement a threshold with truncation. For example, if N is the 
max number of skipped intervals allowed to be returned in a callback/listener:
       - If count ≤ N: include full `skipped_intervals` (same behaviour as 
using `iter_dagrun_infos_between()`)
       - If count > N: include count + truncated list of `skipped_intervals`
   2. We can also remove the `skipped_interval_count` field all together and 
only return `skipped_range`. The user can use `iter_dagrun_infos_between()` in 
their callback/listener implementation if they require more information.
   
   Please let me know what you think of this implementation, we can also 
rollback or try something else. Details are also included in the commit message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to