This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2963064c06f Derive partition_date for composite keys with one temporal 
dimension (#68442)
2963064c06f is described below

commit 2963064c06f948d67c22056360b2f5045aa932d1
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 17 14:52:57 2026 +0800

    Derive partition_date for composite keys with one temporal dimension 
(#68442)
---
 airflow-core/src/airflow/models/backfill.py        |  1 +
 .../src/airflow/partition_mappers/product.py       | 31 +++++++++++-
 airflow-core/tests/unit/models/test_backfill.py    | 58 +++++++++++++++++++++-
 .../tests/unit/partition_mappers/test_product.py   | 40 +++++++++++++++
 4 files changed, 128 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/backfill.py 
b/airflow-core/src/airflow/models/backfill.py
index f27f236e171..52019fa5d98 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -602,6 +602,7 @@ def _handle_clear_run(
             backfill_id=backfill_id,
             dag_run_id=dr.id,
             logical_date=info.logical_date,
+            partition_key=info.partition_key,
             sort_ordinal=sort_ordinal,
         )
     )
diff --git a/airflow-core/src/airflow/partition_mappers/product.py 
b/airflow-core/src/airflow/partition_mappers/product.py
index e7c91aad082..367ac4fda84 100644
--- a/airflow-core/src/airflow/partition_mappers/product.py
+++ b/airflow-core/src/airflow/partition_mappers/product.py
@@ -17,10 +17,13 @@
 
 from __future__ import annotations
 
-from typing import Any
+from typing import TYPE_CHECKING, Any
 
 from airflow.partition_mappers.base import PartitionMapper
 
+if TYPE_CHECKING:
+    from datetime import datetime
+
 
 class ProductMapper(PartitionMapper):
     """Partition mapper that combines multiple mappers into a 
multi-dimensional key."""
@@ -53,6 +56,32 @@ class ProductMapper(PartitionMapper):
             results.append(result)
         return self.delimiter.join(results)
 
+    def to_partition_date(self, downstream_key: str) -> datetime | None:
+        """
+        Return the temporal anchor for *downstream_key*, or ``None`` when 
ambiguous.
+
+        Splits *downstream_key* by :attr:`delimiter` and calls each child 
mapper's
+        ``to_partition_date`` on the corresponding segment. Exactly one 
non-``None``
+        result is returned as the anchor. Zero temporal children (all 
categorical)
+        and two or more temporal children (ambiguous) both return ``None`` —
+        the convention is at most one time-based dimension per product key.
+
+        An unexpected segment count (key does not match the number of child 
mappers)
+        returns ``None`` rather than raising, matching the scheduler's rule of 
leaving
+        ``partition_date`` unset when the input is ambiguous.
+        """
+        segments = downstream_key.split(self.delimiter)
+        if len(segments) != len(self.mappers):
+            return None
+        anchors = [
+            anchor
+            for mapper, segment in zip(self.mappers, segments)
+            if (anchor := mapper.to_partition_date(segment)) is not None
+        ]
+        if len(anchors) == 1:
+            return anchors[0]
+        return None
+
     def serialize(self) -> dict[str, Any]:
         from airflow.serialization.encoders import encode_partition_mapper
 
diff --git a/airflow-core/tests/unit/models/test_backfill.py 
b/airflow-core/tests/unit/models/test_backfill.py
index 531d0ef01ab..538c33a2461 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -41,11 +41,12 @@ from airflow.models.backfill import (
     _create_backfill,
     _do_dry_run,
     _get_latest_dag_run_row_query,
+    _handle_clear_run,
 )
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.sdk import Asset, CronPartitionTimetable, 
PartitionedAssetTimetable
 from airflow.ti_deps.dep_context import DepContext
-from airflow.timetables.base import DagRunInfo
+from airflow.timetables.base import DagRunInfo, DataInterval
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.strings import get_random_string
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -1319,3 +1320,58 @@ def 
test_partitioned_backfill_reprocess_failed(dag_maker, session):
     assert bdr is not None
     assert bdr.dag_run_id == backfill_run.id
     assert bdr.partition_key == info.partition_key
+
+
+def test_handle_clear_run_preserves_partition_key(dag_maker, session):
+    """BackfillDagRun created via the clear/reprocess path carries 
partition_key from info."""
+
+    with dag_maker(schedule="@daily") as dag:
+        PythonOperator(task_id="hi", python_callable=print)
+
+    logical_date = timezone.parse("2026-01-10")
+    dr = dag_maker.create_dagrun(
+        run_id="scheduled_2026-01-10",
+        logical_date=logical_date,
+        session=session,
+        state="failed",
+    )
+    session.commit()
+
+    # Create a Backfill row so the foreign-key constraint is satisfied.
+    backfill = Backfill(
+        dag_id=dag.dag_id,
+        from_date=logical_date,
+        to_date=logical_date,
+        dag_run_conf=None,
+        max_active_runs=1,
+        reprocess_behavior=ReprocessBehavior.FAILED,
+    )
+    session.add(backfill)
+    session.flush()
+
+    partition_key = "2026-01-10T00:00:00"
+    info = DagRunInfo(
+        run_after=logical_date,
+        data_interval=DataInterval(logical_date, logical_date),
+        partition_key=partition_key,
+        partition_date=None,
+    )
+
+    _handle_clear_run(
+        session=session,
+        dag=dag,
+        dr=dr,
+        info=info,
+        backfill_id=backfill.id,
+        sort_ordinal=1,
+    )
+    session.flush()
+
+    bdr = session.scalar(
+        select(BackfillDagRun).where(
+            BackfillDagRun.backfill_id == backfill.id,
+            BackfillDagRun.dag_run_id == dr.id,
+        )
+    )
+    assert bdr is not None
+    assert bdr.partition_key == partition_key
diff --git a/airflow-core/tests/unit/partition_mappers/test_product.py 
b/airflow-core/tests/unit/partition_mappers/test_product.py
index 9d6eba1eb21..ce77441ef40 100644
--- a/airflow-core/tests/unit/partition_mappers/test_product.py
+++ b/airflow-core/tests/unit/partition_mappers/test_product.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+from datetime import datetime, timezone
+
 import pytest
 
 from airflow.partition_mappers.identity import IdentityMapper
@@ -120,3 +122,41 @@ class TestProductMapper:
         mapper = ProductMapper(StartOfHourMapper(), StartOfDayMapper())
         encoded_var = encode_partition_mapper(mapper)[Encoding.VAR]
         assert "max_downstream_keys" not in encoded_var
+
+    @pytest.mark.parametrize(
+        ("mapper", "downstream_key", "expected"),
+        [
+            pytest.param(
+                ProductMapper(StartOfDayMapper(), IdentityMapper()),
+                "2024-01-15|us-east-1",
+                datetime(2024, 1, 15, 0, 0, tzinfo=timezone.utc),
+                id="one-temporal-one-categorical-returns-temporal-anchor",
+            ),
+            pytest.param(
+                ProductMapper(IdentityMapper(), StartOfDayMapper()),
+                "us-east-1|2024-01-15",
+                datetime(2024, 1, 15, 0, 0, tzinfo=timezone.utc),
+                id="categorical-first-temporal-second-returns-temporal-anchor",
+            ),
+            pytest.param(
+                ProductMapper(StartOfDayMapper(), StartOfHourMapper()),
+                "2024-01-15|2024-01-15T10",
+                None,
+                id="two-temporal-children-returns-none",
+            ),
+            pytest.param(
+                ProductMapper(IdentityMapper(), IdentityMapper()),
+                "us-east-1|batch-42",
+                None,
+                id="all-categorical-returns-none",
+            ),
+            pytest.param(
+                ProductMapper(StartOfDayMapper(), IdentityMapper()),
+                "2024-01-15",
+                None,
+                id="wrong-segment-count-returns-none",
+            ),
+        ],
+    )
+    def test_to_partition_date(self, mapper, downstream_key, expected):
+        assert mapper.to_partition_date(downstream_key) == expected

Reply via email to