This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1baafd48652 fix(Asset-Partition): sort partitioned DagRun by
partition_date (#62866)
1baafd48652 is described below
commit 1baafd48652279a0f2214db885afae2890889731
Author: Wei Lee <[email protected]>
AuthorDate: Wed Mar 25 14:37:43 2026 +0800
fix(Asset-Partition): sort partitioned DagRun by partition_date (#62866)
---
.../src/airflow/dag_processing/collection.py | 7 +++--
.../tests/unit/dag_processing/test_collection.py | 31 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index e5925bdd3e5..1d60c32020f 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -136,7 +136,6 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
"""Build a select statement to retrieve the last partitioned run for each
Dag."""
- # todo: AIP-76 we should add a partition date field
latest_run_id = (
select(DagRun.id)
.where(
@@ -149,7 +148,11 @@ def _get_latest_runs_stmt_partitioned(dag_id: str) ->
Select:
),
DagRun.partition_key.is_not(None),
)
- .order_by(DagRun.id.desc()) # todo: AIP-76 add partition date and
sort by it here
+ .order_by(
+ DagRun.partition_date.is_(None),
+ DagRun.partition_date.desc(),
+ DagRun.run_after.desc(),
+ )
.limit(1)
.scalar_subquery()
)
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 1c12371de7d..77dc1318b05 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -37,6 +37,7 @@ from airflow.dag_processing.collection import (
AssetModelOperation,
DagModelOperation,
_get_latest_runs_stmt,
+ _get_latest_runs_stmt_partitioned,
_update_dag_tags,
update_dag_parsing_results_in_db,
)
@@ -58,6 +59,7 @@ from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher
from airflow.serialization.definitions.assets import SerializedAsset
from airflow.serialization.encoders import ensure_serialized_asset
from airflow.serialization.serialized_objects import LazyDeserializedDAG
+from airflow.utils.types import DagRunType
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import (
@@ -96,6 +98,35 @@ def test_statement_latest_runs_one_dag():
assert actual == expected, compiled_stmt
[email protected]_test
+def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker,
session):
+ with dag_maker("fake-dag", schedule=None):
+ pass
+ dag_maker.sync_dagbag_to_db()
+
+ for i, (run_id, partition_key, partition_date) in enumerate(
+ (
+ ("newest-partition-date", "2025-01-02", tz.datetime(2025, 1, 2)),
+ ("older-partition-date", "2025-01-01", tz.datetime(2025, 1, 1)),
+ ("null-partition-date", "not-a-time-based-partition", None),
+ )
+ ):
+ dag_maker.create_dagrun(
+ run_id=run_id,
+ logical_date=None,
+ data_interval=None,
+ run_type=DagRunType.SCHEDULED,
+ run_after=tz.datetime(2025, 1, 1 + i),
+ partition_key=partition_key,
+ partition_date=partition_date,
+ session=session,
+ )
+
+ latest = session.scalar(_get_latest_runs_stmt_partitioned("fake-dag"))
+ assert latest is not None
+ assert latest.partition_date == tz.datetime(2025, 1, 2)
+
+
@pytest.mark.db_test
class TestAssetModelOperation:
@staticmethod