ferruzzi commented on code in PR #68359:
URL: https://github.com/apache/airflow/pull/68359#discussion_r3456181305
##########
task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py:
##########
@@ -19,7 +19,11 @@
from cadwyn import HeadVersion, Version, VersionBundle
+from airflow.sdk.execution_time.schema.versions.v2026_06_23 import
AddDagSkippedIntervalsCallbackRequest
+
bundle = VersionBundle(
HeadVersion(),
+ Version("2026-06-23", AddDagSkippedIntervalsCallbackRequest),
Version("2026-06-16"),
+ Version("2026-05-23"),
Review Comment:
Please leave this comment open as a reminder to verify this file before we
merge.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2374,8 +2390,56 @@ def _mark_backfills_complete(self, *, session: Session =
NEW_SESSION) -> None:
for b in backfills:
b.completed_at = now
- def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
- """Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
+ def _collect_skipped_intervals(
+ self,
+ serdag: SerializedDAG,
+ new_data_interval: DataInterval,
+ session: Session,
+ ) -> SkippedIntervalsSummary | None:
+ """
+ Summarize intervals skipped due to catchup=False.
+
+ Computes the intervals that would have been scheduled between the
previous
+ automated DagRun's data_interval_end and the new run's
data_interval_start,
+ had catchup been True. Returns ``None`` when there is no gap or when
+ no previous run exists.
+ """
+ if serdag.catchup:
+ return None
+ listener_has_impls = bool(
+ get_listener_manager().hook.on_intervals_skipped.get_hookimpls()
# type: ignore[attr-defined]
+ )
+ if not serdag.has_on_skipped_intervals_callback and not
listener_has_impls:
+ return None
+
+ prev_run = session.scalar(
+ select(DagRun)
+ .where(
+ DagRun.dag_id == serdag.dag_id,
+ DagRun.run_type == DagRunType.SCHEDULED,
+ DagRun.data_interval_end.is_not(None),
+ DagRun.data_interval_end <= new_data_interval.start,
+ )
+ .order_by(DagRun.data_interval_end.desc())
+ .limit(1)
+ )
Review Comment:
I have not thought this through entirely, but consider: Should this be
filtering on `DagRun.state == SUCCESS` as well? Give it a thought and let me
know what you decide.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]