Lee-W commented on code in PR #67285:
URL: https://github.com/apache/airflow/pull/67285#discussion_r3401377839


##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -670,6 +735,31 @@ def _make_asset_model(
     return model
 
 
+class TestComputeTargetPartitionDate:
+    def test_identity_mapper_passes_source_date_through(self):
+        from airflow.assets.manager import _compute_target_partition_date
+        from airflow.partition_mappers.identity import IdentityMapper

Review Comment:
   Could we move these inline imports to the top level? thanks!



##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -670,6 +735,31 @@ def _make_asset_model(
     return model
 
 
+class TestComputeTargetPartitionDate:
+    def test_identity_mapper_passes_source_date_through(self):
+        from airflow.assets.manager import _compute_target_partition_date
+        from airflow.partition_mappers.identity import IdentityMapper
+
+        source_date = timezone.parse("2026-05-20T01:00:00")
+        result = _compute_target_partition_date(
+            mapper=IdentityMapper(),
+            source_partition_date=source_date,
+        )
+        assert result == source_date
+
+    def test_non_identity_mapper_returns_none(self):
+        """Only IdentityMapper carries the source date here; 
temporal/composite mappers
+        are resolved from the key by the scheduler via to_partition_date, so 
this returns None."""
+        from airflow.assets.manager import _compute_target_partition_date

Review Comment:
   same here



##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -670,6 +735,31 @@ def _make_asset_model(
     return model
 
 
+class TestComputeTargetPartitionDate:
+    def test_identity_mapper_passes_source_date_through(self):
+        from airflow.assets.manager import _compute_target_partition_date
+        from airflow.partition_mappers.identity import IdentityMapper
+
+        source_date = timezone.parse("2026-05-20T01:00:00")
+        result = _compute_target_partition_date(
+            mapper=IdentityMapper(),
+            source_partition_date=source_date,
+        )
+        assert result == source_date
+
+    def test_non_identity_mapper_returns_none(self):
+        """Only IdentityMapper carries the source date here; 
temporal/composite mappers
+        are resolved from the key by the scheduler via to_partition_date, so 
this returns None."""
+        from airflow.assets.manager import _compute_target_partition_date
+        from airflow.partition_mappers.temporal import StartOfDayMapper
+
+        result = _compute_target_partition_date(

Review Comment:
   hmmm... or we could probably just remove this function



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -691,6 +746,27 @@ def _get_or_create_apdr(
                 .limit(1)
             )
             if latest_apdr and latest_apdr.created_dag_run_id is None:
+                existing_partition_date = latest_apdr.partition_date
+                if (
+                    existing_partition_date is not None
+                    and target_partition_date is not None
+                    and existing_partition_date != target_partition_date
+                ):
+                    # Two contributing events carry conflicting 
partition_dates for the same
+                    # (target_key, target_dag). Choosing one would be 
order-dependent, so
+                    # suppress: the consumer DagRun gets partition_date=None 
rather than a
+                    # wrong, unstable value.
+                    log.warning(
+                        "Conflicting partition_date carried for the same 
target key; "
+                        "suppressing it so the consumer DagRun's 
partition_date is None. "
+                        "The producing assets likely disagree on the 
partition's datetime.",
+                        target_dag_id=target_dag.dag_id,
+                        target_key=target_key,
+                        existing_partition_date=existing_partition_date,
+                        incoming_partition_date=target_partition_date,
+                    )
+                    latest_apdr.partition_date = None

Review Comment:
   we can set the DagRun partition_date to None, but do we also need to set the 
one for ADPR as none as well?



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -63,6 +67,30 @@
 log = structlog.get_logger(__name__)
 
 
+def _compute_target_partition_date(

Review Comment:
   we can move the logic to `_queue_partitioned_dags`. it does not provide much 
value as a standalone function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to