This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1baafd48652 fix(Asset-Partition): sort partitioned DagRun by 
partition_date (#62866)
1baafd48652 is described below

commit 1baafd48652279a0f2214db885afae2890889731
Author: Wei Lee <[email protected]>
AuthorDate: Wed Mar 25 14:37:43 2026 +0800

    fix(Asset-Partition): sort partitioned DagRun by partition_date (#62866)
---
 .../src/airflow/dag_processing/collection.py       |  7 +++--
 .../tests/unit/dag_processing/test_collection.py   | 31 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index e5925bdd3e5..1d60c32020f 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -136,7 +136,6 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
 
 def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
     """Build a select statement to retrieve the last partitioned run for each 
Dag."""
-    # todo: AIP-76 we should add a partition date field
     latest_run_id = (
         select(DagRun.id)
         .where(
@@ -149,7 +148,11 @@ def _get_latest_runs_stmt_partitioned(dag_id: str) -> 
Select:
             ),
             DagRun.partition_key.is_not(None),
         )
-        .order_by(DagRun.id.desc())  # todo: AIP-76 add partition date and 
sort by it here
+        .order_by(
+            DagRun.partition_date.is_(None),
+            DagRun.partition_date.desc(),
+            DagRun.run_after.desc(),
+        )
         .limit(1)
         .scalar_subquery()
     )
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 1c12371de7d..77dc1318b05 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -37,6 +37,7 @@ from airflow.dag_processing.collection import (
     AssetModelOperation,
     DagModelOperation,
     _get_latest_runs_stmt,
+    _get_latest_runs_stmt_partitioned,
     _update_dag_tags,
     update_dag_parsing_results_in_db,
 )
@@ -58,6 +59,7 @@ from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher
 from airflow.serialization.definitions.assets import SerializedAsset
 from airflow.serialization.encoders import ensure_serialized_asset
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
+from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import (
@@ -96,6 +98,35 @@ def test_statement_latest_runs_one_dag():
         assert actual == expected, compiled_stmt
 
 
[email protected]_test
+def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker, 
session):
+    with dag_maker("fake-dag", schedule=None):
+        pass
+    dag_maker.sync_dagbag_to_db()
+
+    for i, (run_id, partition_key, partition_date) in enumerate(
+        (
+            ("newest-partition-date", "2025-01-02", tz.datetime(2025, 1, 2)),
+            ("older-partition-date", "2025-01-01", tz.datetime(2025, 1, 1)),
+            ("null-partition-date", "not-a-time-based-partition", None),
+        )
+    ):
+        dag_maker.create_dagrun(
+            run_id=run_id,
+            logical_date=None,
+            data_interval=None,
+            run_type=DagRunType.SCHEDULED,
+            run_after=tz.datetime(2025, 1, 1 + i),
+            partition_key=partition_key,
+            partition_date=partition_date,
+            session=session,
+        )
+
+    latest = session.scalar(_get_latest_runs_stmt_partitioned("fake-dag"))
+    assert latest is not None
+    assert latest.partition_date == tz.datetime(2025, 1, 2)
+
+
 @pytest.mark.db_test
 class TestAssetModelOperation:
     @staticmethod

Reply via email to