kaxil commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3442034157


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2086,7 +2091,7 @@ def _resolve_partition_date(
             return None
 
         if not anchors:
-            return None
+            return carried_partition_date

Review Comment:
   When an APDR mixes an `IdentityMapper` (which carries a date) with a 
temporal mapper on the same `target_key`, `carried_partition_date` is only 
honored on this `not anchors` path. The moment any temporal mapper resolves an 
anchor, `return anchors.pop()` (or the >1-anchor warning path) supersedes the 
carried date, with no carry-vs-anchor comparison and no warning even when the 
two disagree. `partition_mapper_config` is per-asset and a single APDR can be 
fed by multiple assets, so a mixed identity+temporal feed is reachable. Is the 
assumption that identity and temporal mappers never feed the same partitioned 
consumer? If so, worth asserting/logging it here rather than relying on it 
implicitly.



##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2027,6 +2028,39 @@ def test_partition_key_in_context(self, 
create_runtime_ti, mock_supervisor_comms
         context = runtime_ti.get_template_context()
         assert context["partition_key"] == "some-partition"
 
+    def test_partition_date_in_context(self, create_runtime_ti, 
mock_supervisor_comms):
+        """Test that partition_date from dag_run is exposed in the template 
context."""
+        task = BaseOperator(task_id="hello")
+        runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
+
+        dr = runtime_ti._ti_context_from_server.dag_run
+
+        mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult(
+            data_interval_end=dr.logical_date - timedelta(hours=1),
+            data_interval_start=dr.logical_date - timedelta(hours=2),
+            start_date=dr.start_date - timedelta(hours=1),
+            end_date=dr.start_date,
+        )
+
+        context = runtime_ti.get_template_context()
+
+        # Default: partition_date is None
+        assert context["partition_date"] is None
+
+        # Set partition_date on dag_run and verify it surfaces in context
+        partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0)
+        dr.partition_date = partition_date
+        context = runtime_ti.get_template_context()
+        assert context["partition_date"] == partition_date
+
+        # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts` 
filters
+        # operate on a real awareness boundary.
+        from datetime import datetime as _datetime

Review Comment:
   Nit: redundant inline import. `datetime` is already imported at the module 
top (line 27), so this aliased `from datetime import datetime as _datetime` can 
be dropped and `datetime(2026, 5, 20, 1, 0, 0)` used directly (it's naive just 
the same). Matches the inline-import cleanups requested earlier in the review.



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -695,6 +722,27 @@ def _get_or_create_apdr(
                 .limit(1)
             )
             if latest_apdr and latest_apdr.created_dag_run_id is None:
+                existing_partition_date = latest_apdr.partition_date
+                if (
+                    existing_partition_date is not None
+                    and target_partition_date is not None
+                    and existing_partition_date != target_partition_date

Review Comment:
   This conflict guard only ever clears, it never adopts or recovers. If the 
APDR is first created with `partition_date=None` (a temporal/composite event, 
or an identity event whose producer had no date) and a later identity event 
carries `D`, the guard skips because `existing_partition_date is None`, so `D` 
is silently dropped, and if no temporal mapper feeds this key there's nothing 
to re-derive it from at run creation. Suppression is also sticky: once a 
`D1`/`D2` conflict has cleared the date to `None`, a later pair of agreeing 
events (`D2`/`D2`) never restores it. The new tests cover the `D1` vs `D2` 
suppression but not the None-then-`D` adopt path or the recover-after-conflict 
chain. Is the permanent-suppress behavior intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to