This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4c71e37848c Avoid lazy-loading timetable fields for latest DagRuns
(#66488)
4c71e37848c is described below
commit 4c71e37848ce19d83bb9a62267afe440cbb50e5c
Author: Henry Chen <[email protected]>
AuthorDate: Tue Jun 2 21:31:44 2026 +0800
Avoid lazy-loading timetable fields for latest DagRuns (#66488)
---
.../src/airflow/dag_processing/collection.py | 6 ++++
.../tests/unit/dag_processing/test_collection.py | 42 +++++++++++++++++++---
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 785763bf9d8..7cdf490d938 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -129,8 +129,11 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
load_only(
DagRun.dag_id,
DagRun.logical_date,
+ DagRun.run_after,
DagRun.data_interval_start,
DagRun.data_interval_end,
+ DagRun.partition_key,
+ DagRun.partition_date,
)
)
)
@@ -165,8 +168,11 @@ def _get_latest_runs_stmt_partitioned(dag_id: str) ->
Select:
load_only(
DagRun.dag_id,
DagRun.logical_date,
+ DagRun.run_after,
DagRun.data_interval_start,
DagRun.data_interval_end,
+ DagRun.partition_key,
+ DagRun.partition_date,
)
)
)
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 8a536dbb12a..bc7a1490e16 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -27,7 +27,7 @@ from unittest import mock
from unittest.mock import patch
import pytest
-from sqlalchemy import delete, func, select
+from sqlalchemy import delete, func, inspect as sa_inspect, select
from sqlalchemy.exc import OperationalError, SAWarning
import airflow.dag_processing.collection
@@ -89,8 +89,9 @@ def test_statement_latest_runs_one_dag():
compiled_stmt = str(stmt.compile())
actual = [x.strip() for x in compiled_stmt.splitlines()]
expected = [
- "SELECT dag_run.id, dag_run.dag_id, dag_run.logical_date, "
- "dag_run.data_interval_start, dag_run.data_interval_end",
+ "SELECT dag_run.id, dag_run.dag_id, dag_run.logical_date,
dag_run.data_interval_start, "
+ "dag_run.data_interval_end, dag_run.run_after,
dag_run.partition_key, "
+ "dag_run.partition_date",
"FROM dag_run",
"WHERE dag_run.dag_id = :dag_id_1 AND dag_run.logical_date = ("
"SELECT max(dag_run.logical_date) AS max_logical_date",
@@ -101,11 +102,38 @@ def test_statement_latest_runs_one_dag():
@pytest.mark.db_test
-def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker,
session):
+def test_statement_latest_runs_loads_timetable_fields(dag_maker, session):
with dag_maker("fake-dag", schedule=None):
pass
dag_maker.sync_dagbag_to_db()
+ logical_date = tz.datetime(2025, 1, 1)
+ run_after = tz.datetime(2025, 1, 2)
+
+ dag_maker.create_dagrun(
+ run_id="latest-run",
+ logical_date=logical_date,
+ data_interval=(logical_date, run_after),
+ run_type=DagRunType.SCHEDULED,
+ run_after=run_after,
+ session=session,
+ )
+ session.flush()
+ session.expunge_all() # Ensure we load from DB, not from session cache
+
+ latest = session.scalar(_get_latest_runs_stmt("fake-dag"))
+ assert latest is not None
+ assert {"run_after", "partition_date",
"partition_key"}.isdisjoint(sa_inspect(latest).unloaded)
+ assert latest.run_after == run_after
+ assert latest.partition_key is None
+ assert latest.partition_date is None
+
+
[email protected]_test
+def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker,
session):
+ with dag_maker("fake-dag", schedule=None):
+ pass
+ dag_maker.sync_dagbag_to_db()
for i, (run_id, partition_key, partition_date) in enumerate(
(
("newest-partition-date", "2025-01-02", tz.datetime(2025, 1, 2)),
@@ -124,8 +152,14 @@ def
test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker, s
session=session,
)
+ session.flush()
+ session.expunge_all() # Ensure we load from DB, not from session cache
+
latest = session.scalar(_get_latest_runs_stmt_partitioned("fake-dag"))
assert latest is not None
+ assert {"run_after", "partition_date",
"partition_key"}.isdisjoint(sa_inspect(latest).unloaded)
+ assert latest.run_after == tz.datetime(2025, 1, 1)
+ assert latest.partition_key == "2025-01-02"
assert latest.partition_date == tz.datetime(2025, 1, 2)