anishgirianish commented on code in PR #65447:
URL: https://github.com/apache/airflow/pull/65447#discussion_r3222706541


##########
task-sdk/src/airflow/sdk/definitions/asset/decorators.py:
##########
@@ -36,9 +36,33 @@
     from airflow.sdk.bases.decorator import _TaskDecorator
     from airflow.sdk.definitions.dag import DagStateChangeCallback, ScheduleArg
     from airflow.sdk.definitions.param import ParamsDict
+    from airflow.sdk.types import OutletEventAccessorsProtocol
     from airflow.triggers.base import BaseTrigger
 
 
+_INVALID_INLET_ASSET_NAMES = ("self", "context", "outlet_events")
+
+
+class _AssetSelfProxy:
+    """Proxy for ``self`` in ``@asset`` functions; intercepts 
``partition_keys`` writes and forwards them to the outlet event accessor."""
+
+    def __init__(self, asset: Asset, outlet_events: 
OutletEventAccessorsProtocol) -> None:
+        object.__setattr__(self, "_asset", asset)
+        object.__setattr__(self, "_outlet_events", outlet_events)
+
+    def __getattr__(self, name: str) -> Any:
+        if name == "partition_keys":
+            return self._outlet_events[self._asset].partition_keys
+        return getattr(self._asset, name)
+
+    def __setattr__(self, name: str, value: Any) -> None:
+        if name != "partition_keys":
+            raise AttributeError(
+                f"Cannot set {name!r} on @asset self; only 'partition_keys' is 
settable at runtime"
+            )
+        self._outlet_events[self._asset].partition_keys = value

Review Comment:
   removed thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to