nathadfield commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3442125696


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2086,7 +2091,7 @@ def _resolve_partition_date(
             return None
 
         if not anchors:
-            return None
+            return carried_partition_date

Review Comment:
   @kaxil right, that's the intended contract: a partitioned consumer's feeding 
assets are expected to agree on the partition datetime, and when a temporal 
mapper resolves an anchor it takes precedence over the carried identity date 
(the key is the authoritative source the scheduler can re-derive). I've added a 
comment here making that explicit rather than implicit.
   



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -695,6 +722,27 @@ def _get_or_create_apdr(
                 .limit(1)
             )
             if latest_apdr and latest_apdr.created_dag_run_id is None:
+                existing_partition_date = latest_apdr.partition_date
+                if (
+                    existing_partition_date is not None
+                    and target_partition_date is not None
+                    and existing_partition_date != target_partition_date

Review Comment:
   @kaxil good catch on the None-then-`D` drop, that was an unintended gap 
rather than deliberate. I've changed it to adopt the incoming date when the 
APDR carries none yet, so a later identity date is no longer dropped, and that 
also makes suppression non-sticky (a post-conflict `None` is re-adoptable). 
Genuine `D1`/`D2` conflicts still suppress to `None`. Added tests for the adopt 
and recover-after-conflict paths.
   



##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2027,6 +2028,39 @@ def test_partition_key_in_context(self, 
create_runtime_ti, mock_supervisor_comms
         context = runtime_ti.get_template_context()
         assert context["partition_key"] == "some-partition"
 
+    def test_partition_date_in_context(self, create_runtime_ti, 
mock_supervisor_comms):
+        """Test that partition_date from dag_run is exposed in the template 
context."""
+        task = BaseOperator(task_id="hello")
+        runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
+
+        dr = runtime_ti._ti_context_from_server.dag_run
+
+        mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult(
+            data_interval_end=dr.logical_date - timedelta(hours=1),
+            data_interval_start=dr.logical_date - timedelta(hours=2),
+            start_date=dr.start_date - timedelta(hours=1),
+            end_date=dr.start_date,
+        )
+
+        context = runtime_ti.get_template_context()
+
+        # Default: partition_date is None
+        assert context["partition_date"] is None
+
+        # Set partition_date on dag_run and verify it surfaces in context
+        partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0)
+        dr.partition_date = partition_date
+        context = runtime_ti.get_template_context()
+        assert context["partition_date"] == partition_date
+
+        # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts` 
filters
+        # operate on a real awareness boundary.
+        from datetime import datetime as _datetime

Review Comment:
   @kaxil done, dropped the aliased inline import and used the module-level 
`datetime`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to