anishgirianish commented on code in PR #66782:
URL: https://github.com/apache/airflow/pull/66782#discussion_r3249169487
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1506,11 +1540,27 @@ def register_asset_changes_in_db(
)
}
- asset_event_extras: dict[SerializedAssetUniqueKey, dict] = {
- SerializedAssetUniqueKey(**event["dest_asset_key"]): event["extra"]
- for event in outlet_events
- if "source_alias_name" not in event
- }
+ def _register(am: AssetModel, key: SerializedAssetUniqueKey) -> None:
+ payloads_for_asset = payloads_by_asset.get(key, [])
+ if not payloads_for_asset:
+ asset_manager.register_asset_change(
+ task_instance=ti,
+ asset=am,
+ extra=None,
+ partition_key=dag_run_partition_key,
+ session=session,
+ )
+ return
+ for payload in payloads_for_asset:
+ asset_manager.register_asset_change(
+ task_instance=ti,
+ asset=am,
+ extra=payload.extra,
+ partition_key=payload.partition_key
+ if payload.partition_key is not None
+ else dag_run_partition_key,
Review Comment:
Agreed, dropped the fallback. Kept it only in the empty-payloads branch,
happy to drop that too if you'd prefer. Thank you
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]