anishgirianish commented on code in PR #66782:
URL: https://github.com/apache/airflow/pull/66782#discussion_r3249119379
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1476,10 +1483,37 @@ def register_asset_changes_in_db(
SerializedAssetUriRef,
)
- # TODO: AIP-76 should we provide an interface to override this, so
that the task can
- # tell the truth if for some reason it touches a different partition?
- # https://github.com/apache/airflow/issues/58474
- partition_key = ti.dag_run.partition_key
+ payloads_by_asset: dict[SerializedAssetUniqueKey,
list[OutletEventPayload]] = defaultdict(list)
+ for outlet_event in outlet_events:
+ # Alias-emitted events are handled separately further down via
+ # register_asset_change_for_alias, which uses the DagRun-level
+ # partition_key. Per-emission partition keys do not fan out through
+ # the alias path — emission via an alias produces one event per
+ # resolved asset, all carrying the same dag_run_partition_key.
+ if "source_alias_name" in outlet_event:
+ continue
+ asset_key =
SerializedAssetUniqueKey(**outlet_event["dest_asset_key"])
+ payloads_by_asset[asset_key].append(
+ OutletEventPayload(
+ extra=outlet_event["extra"],
partition_key=outlet_event.get("partition_key")
+ )
+ )
+
+ # Back-fill DagRun.partition_key from the task emission when the task
+ # emitted exactly one distinct partition_key across all outlet events
+ # and the DagRun did not already have one set. This lets a task that
+ # discovers the partition at runtime (rather than via params) act as
+ # the source of truth for the DagRun-level key.
+ runtime_pks: set[str] = {
+ payload.partition_key
+ for payloads in payloads_by_asset.values()
+ for payload in payloads
+ if payload.partition_key is not None
+ }
Review Comment:
Updated thank you
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]