phanikumv commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3341596946
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1863,40 +1871,225 @@ def _do_scheduling(self, session: Session) -> int:
return num_queued_tis
+ def _check_rollup_asset_status(
+ self,
+ *,
+ asset_id: int,
+ apdr: AssetPartitionDagRun,
+ mapper: RollupMapper,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ expected = mapper.to_upstream(apdr.partition_key)
+ return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+ def _resolve_asset_partition_status(
+ self,
+ *,
+ session: Session,
+ asset_id: int,
+ name: str,
+ uri: str,
+ apdr: AssetPartitionDagRun,
+ timetable: PartitionedAssetTimetable,
+ actual_by_asset: dict[int, set[str]],
+ ) -> bool:
+ """
+ Return whether *asset_id* has been satisfied for *apdr*.
+
+ Non-rollup assets resolve to ``True`` because the caller only invokes
+ this for assets that already have at least one logged event for *APDR*
+ (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+ the non-rollup contract for "received". Rollup assets defer to
+ :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+ A misconfigured mapper that raises returns ``False`` (treated as
+ not-yet-satisfied); the exception is logged at ``ERROR`` level in the
+ scheduler log so operators can diagnose the misconfiguration.
+ """
+ try:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ if not mapper.is_rollup:
+ return True
+ return self._check_rollup_asset_status(
+ asset_id=asset_id,
+ apdr=apdr,
+ mapper=cast("RollupMapper", mapper),
+ actual_by_asset=actual_by_asset,
+ )
+ except Exception:
+ self.log.exception(
+ "Failed to evaluate rollup status for asset; treating as
not-yet-satisfied. "
+ "This likely indicates a misconfigured partition mapper.",
+ dag_id=apdr.target_dag_id,
+ partition_key=apdr.partition_key,
+ asset_name=name,
+ asset_uri=uri,
+ )
+ return False
+
def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
+ """
+ Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose
partition is satisfied.
+
+ Returns the set of ``dag_id`` strings that received a new
partition-driven Dag run in this
+ tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to
exclude the same Dags
+ from the standard schedule-driven and asset-triggered creation paths
so a single Dag never
+ gets two Dag runs for the same tick when it appears in more than one
creation path. We
+ return ``dag_id`` strings rather than full Dag/DagRun objects because
the only downstream
+ use is membership lookup, and a heavier return type would just be
discarded.
+
+ Asset deactivation freezes pending APDRs: when an asset becomes
inactive
+ (orphan — no Dag declares it any more), its ``PartitionedAssetKeyLog``
rows
+ stop contributing to the rollup. If the consumer Dag still depends on
that
+ asset, firing on stale history would conflict with the declared
topology,
+ so the APDR waits. Reactivating the asset resumes evaluation
automatically.
+ This matches the UI's progress view (``_fetch_active_assets_per_dag``).
+ """
+ # Cap per-tick work so the scheduler transaction stays bounded and
other
+ # scheduling work isn't starved. Remaining APDRs drain across
subsequent ticks.
+ # FIFO is intentional: the oldest pending APDR fires first. A
persistently
+ # unsatisfiable APDR at the head (e.g. broken mapper, upstream that
will
+ # never arrive) blocks newer ones until an operator removes it or fixes
+ # the underlying mapper. We surface the stuck state rather than
silently
+ # rotating past it.
+ # `with_row_locks(skip_locked=True)` mirrors the sibling ADRQ claim
path:
+ # in HA two schedulers can otherwise both grab the same satisfied APDR
+ # and race the `created_dag_run_id` UPDATE, orphaning whichever DagRun
+ # loses. The `id` tiebreaker on `order_by` keeps LIMIT deterministic
when
+ # two APDRs share a `created_at` under bulk asset-event ingestion.
+ # SQLite is single-writer and silently drops `FOR UPDATE`, which is
fine.
+ pending_apdrs = session.scalars(
+ with_row_locks(
+ select(AssetPartitionDagRun)
+ .join(DagModel, DagModel.dag_id ==
AssetPartitionDagRun.target_dag_id)
+ .where(
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ DagModel.is_stale.is_(False),
+ )
+ .order_by(AssetPartitionDagRun.created_at,
AssetPartitionDagRun.id)
+ .limit(self._max_partition_dag_runs_per_loop),
+ of=AssetPartitionDagRun,
+ skip_locked=True,
+ key_share=False,
+ session=session,
+ )
+ ).all()
+ if not pending_apdrs:
+ return set()
+
+ # Pre-fetch all required serialized Dags in one query. The same map
+ # serves the stale-version cleanup below and the downstream rollup
+ # evaluation, so the table is only hit once per tick.
+ dag_ids = list({apdr.target_dag_id for apdr in pending_apdrs})
+ serdags_by_dag_id: dict[str, SerializedDagModel] = {
+ sd.dag_id: sd
+ for sd in
SerializedDagModel.get_latest_serialized_dags(dag_ids=dag_ids, session=session)
+ }
+
+ # Stale-version cleanup. An APDR stamped with a ``dag_version_id`` that
+ # no longer matches the Dag's latest serialized version was queued
+ # under a definition (mapper / window) that may not apply any more.
+ # Firing on partial data — or holding forever because the new mapper
+ # demands keys that will never arrive — would both be wrong, so the
+ # APDR + its PartitionedAssetKeyLog rows are dropped in the same
+ # transaction. Rows stamped ``NULL`` (legacy, pre-column) are likewise
+ # treated as stale on the first tick after upgrade.
+ stale_apdrs = [
+ apdr
+ for apdr in pending_apdrs
+ if (serdag := serdags_by_dag_id.get(apdr.target_dag_id)) is not
None
+ and apdr.dag_version_id != serdag.dag_version_id
Review Comment:
The issue with this approach is that `dag_version_id` changes on any
structural edit to the Dag, not just a change to the rollup's mapper/window. So
this fires far more often than intended.Suggested fix — discard only when the
rollup definition actually changed, not on every version bump.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]