Tegh25 commented on code in PR #68359:
URL: https://github.com/apache/airflow/pull/68359#discussion_r3440437542
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2385,66 @@ def _mark_backfills_complete(self, *, session: Session =
NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now
- def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
- """Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
+ def _collect_skipped_intervals(
+ self,
+ serdag: SerializedDAG,
+ new_data_interval: DataInterval,
+ session: Session,
+ ) -> list[tuple[DateTime, DateTime]]:
+ """
+ Return a list of (start, end) tuples for intervals skipped due to
catchup=False.
+
+ Computes the intervals that would have been scheduled between the
previous
+ automated DagRun's data_interval_end and the new run's
data_interval_start,
+ had catchup been True. Returns an empty list when there is no gap or
when
+ no previous run exists.
+ """
+ if serdag.catchup:
+ return []
+ listener_has_impls = bool(
+ get_listener_manager().hook.on_intervals_skipped.get_hookimpls()
# type: ignore[attr-defined]
+ )
+ if not serdag.has_on_skipped_intervals_callback and not
listener_has_impls:
+ return []
+
+ prev_run = session.scalar(
+ select(DagRun)
+ .where(
+ DagRun.dag_id == serdag.dag_id,
+ DagRun.run_type.in_([DagRunType.SCHEDULED]),
+ DagRun.data_interval_end.is_not(None),
+ DagRun.data_interval_end <= new_data_interval.start,
+ )
+ .order_by(DagRun.data_interval_end.desc())
+ .limit(1)
+ )
+ if prev_run is None or prev_run.data_interval_end is None:
+ return []
+
+ prev_end = prev_run.data_interval_end
+ new_start = new_data_interval.start
+ if prev_end >= new_start:
+ return []
+
+ skipped: list[tuple[DateTime, DateTime]] = []
+ for info in serdag.iter_dagrun_infos_between(prev_end, new_start):
Review Comment:
@ferruzzi #66791 proposed returning a full list of skipped (start, end)
intervals. That would require calling `iter_dagrun_infos_between()` on the
scheduler path, which is O(n) in time, and returning an O(n) list in the
callback/listener payload. I'm thinking that the best solution is to no longer
call `iter_dagrun_infos_between()` and not return every skipped interval.
## Solution
The latest commit (as of writing
[968793d](https://github.com/apache/airflow/pull/68359/commits/968793db2e18af4364acc4131c5e0da2861e288d))
changes the callback/listener so that it returns a count of all the intervals
`skipped_interval_count` and time span between the previous automated run and
the new run `skipped_range`, achieving O(1) time/memory for both issues. Since
Dags can be scheduled with a cron expression or `timedelta` object, this
implementation adds a `count_skipped_intervals_between()` method to the mixins:
- `airflow-core/src/airflow/timetables/_delta.py`
- `airflow-core/src/airflow/timetables/_cron.py`
This calculates the number of skipped intervals depending on how the Dag was
scheduled.
The limitation with this is that if the user wants to implement a custom
Timetable for scheduling, they will also need to write their own
`count_skipped_intervals_between()` if they want to use the skipped intervals
callback/listener. Also, the cron method can handle uniform (e.g. hourly),
monthly, and weekly periods. Anything else (e.g. complex DOM+DOW combinations,
*/2 hours with DST edge cases that break uniform-period detection) raises
`AirflowTimetableInvalid` in the current implementation. We may also need to
refine how the `AirflowTimetableInvalid` should be caught.
## Alternatives
1. We can implement a threshold with truncation. For example, if N is the
max number of skipped intervals allowed to be returned in a callback/listener:
- If count ≤ N: include full `skipped_intervals` (same behaviour as
using `iter_dagrun_infos_between()`)
- If count > N: include count + truncated list of `skipped_intervals`
2. We can also remove the `skipped_interval_count` field all together and
only return `skipped_range`. The user can use `iter_dagrun_infos_between()` in
their callback/listener implementation if they require more information.
Please let me know what you think of this implementation, we can also
rollback or try something else. Details are also included in the commit message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]