This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5ddd2840e37 Add INFO-level logging to asset scheduling path (#63958)
5ddd2840e37 is described below
commit 5ddd2840e37347f3ee4c0a175556ddc65c340e06
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Mar 21 02:49:57 2026 +0000
Add INFO-level logging to asset scheduling path (#63958)
The scheduler's asset (dataset) condition evaluation path had zero
INFO-level logging, making it impossible to debug why asset-triggered
DagRuns were created or skipped. This was a significant gap discovered
during a P1 customer incident where the root cause took hours to trace
because the scheduler was a complete black box for asset scheduling.
Add log lines for:
- ADRQ records loaded per DAG (count and DAG IDs)
- Asset condition evaluation result (met/not met per DAG)
- DAGs deferred due to max_active_runs
- Asset-triggered DagRun creation (DAG ID, triggered_date, queued count)
- Consumed asset events count per DagRun
- ADRQ rows deleted after DagRun creation
* Address review feedback: fix casing in log messages and mypy error
- Change "DAGs" to "Dags" in asset-triggered log messages (dag.py)
- Fix mypy attr-defined error by casting session.execute() result to
CursorResult, matching the existing pattern in delete_dag.py
---
.../src/airflow/jobs/scheduler_job_runner.py | 31 ++++++++++++++++++----
airflow-core/src/airflow/models/dag.py | 14 +++++++++-
2 files changed, 39 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 8400e13cd79..c25e5e2adff 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -31,7 +31,7 @@ from contextlib import ExitStack
from datetime import date, datetime, timedelta
from functools import lru_cache, partial
from itertools import groupby
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
from sqlalchemy import CTE, and_, delete, exists, func, inspect, or_, select,
text, tuple_, update
from sqlalchemy.exc import DBAPIError, OperationalError
@@ -107,6 +107,7 @@ if TYPE_CHECKING:
from types import FrameType
from pendulum.datetime import DateTime
+ from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session
from sqlalchemy.orm.interfaces import LoaderOption
from sqlalchemy.sql.selectable import Subquery
@@ -2042,6 +2043,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
continue
triggered_date: DateTime =
timezone.coerce_datetime(queued_adrqs[0].created_at)
+ self.log.debug(
+ "Creating asset-triggered DagRun for '%s': %d queued assets,
triggered_date=%s",
+ dag.dag_id,
+ len(queued_adrqs),
+ triggered_date,
+ )
cte = (
select(func.max(DagRun.run_after).label("previous_dag_run_run_after"))
.where(
@@ -2090,14 +2097,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
Stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(asset_events)
+ self.log.info(
+ "Created asset-triggered DagRun for '%s': run_id=%s, consumed
%d asset events",
+ dag.dag_id,
+ dag_run.run_id,
+ len(asset_events),
+ )
# Delete only consumed ADRQ rows to avoid dropping newly queued
events
# (e.g. DagRun triggered by asset A while a new event for asset B
arrives).
adrq_pks = [(record.asset_id, record.target_dag_id) for record in
queued_adrqs]
- session.execute(
- delete(AssetDagRunQueue).where(
- tuple_(AssetDagRunQueue.asset_id,
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
- )
+ result = cast(
+ "CursorResult",
+ session.execute(
+ delete(AssetDagRunQueue).where(
+ tuple_(AssetDagRunQueue.asset_id,
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
+ )
+ ),
+ )
+ self.log.info(
+ "Deleted %d ADRQ rows for '%s'",
+ result.rowcount,
+ dag.dag_id,
)
def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session)
-> dict[int, Backfill]:
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index fb462aac4e0..677fbc26048 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -665,6 +665,12 @@ class DagModel(Base):
else:
adrq_by_dag[adrq.target_dag_id].append(adrq)
+ if adrq_by_dag:
+ log.info(
+ "Asset-triggered Dags with queued events: %s",
+ {dag_id: len(adrqs) for dag_id, adrqs in adrq_by_dag.items()},
+ )
+
dag_statuses: dict[str, dict[UKey, bool]] = {
dag_id: {SerializedAssetUniqueKey.from_asset(adrq.asset): True for
adrq in adrqs}
for dag_id, adrqs in adrq_by_dag.items()
@@ -673,7 +679,9 @@ class DagModel(Base):
for ser_dag in ser_dags:
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
- if not dag_ready(dag_id,
cond=ser_dag.dag.timetable.asset_condition, statuses=statuses):
+ ready = dag_ready(dag_id,
cond=ser_dag.dag.timetable.asset_condition, statuses=statuses)
+ if not ready:
+ log.debug("Asset condition not met for dag '%s'", dag_id)
del adrq_by_dag[dag_id]
del dag_statuses[dag_id]
del dag_statuses
@@ -698,6 +706,10 @@ class DagModel(Base):
)
)
if exclusion_list:
+ log.info(
+ "Asset-triggered Dags at max_active_runs, deferring: %s",
+ exclusion_list,
+ )
asset_triggered_dag_ids -= exclusion_list
triggered_date_by_dag = {
k: v for k, v in triggered_date_by_dag.items() if k not in
exclusion_list