anishgirianish commented on code in PR #65447:
URL: https://github.com/apache/airflow/pull/65447#discussion_r3222706541
##########
task-sdk/src/airflow/sdk/definitions/asset/decorators.py:
##########
@@ -36,9 +36,33 @@
from airflow.sdk.bases.decorator import _TaskDecorator
from airflow.sdk.definitions.dag import DagStateChangeCallback, ScheduleArg
from airflow.sdk.definitions.param import ParamsDict
+ from airflow.sdk.types import OutletEventAccessorsProtocol
from airflow.triggers.base import BaseTrigger
+_INVALID_INLET_ASSET_NAMES = ("self", "context", "outlet_events")
+
+
+class _AssetSelfProxy:
+ """Proxy for ``self`` in ``@asset`` functions; intercepts
``partition_keys`` writes and forwards them to the outlet event accessor."""
+
+ def __init__(self, asset: Asset, outlet_events:
OutletEventAccessorsProtocol) -> None:
+ object.__setattr__(self, "_asset", asset)
+ object.__setattr__(self, "_outlet_events", outlet_events)
+
+ def __getattr__(self, name: str) -> Any:
+ if name == "partition_keys":
+ return self._outlet_events[self._asset].partition_keys
+ return getattr(self._asset, name)
+
+ def __setattr__(self, name: str, value: Any) -> None:
+ if name != "partition_keys":
+ raise AttributeError(
+ f"Cannot set {name!r} on @asset self; only 'partition_keys' is
settable at runtime"
+ )
+ self._outlet_events[self._asset].partition_keys = value
Review Comment:
removed thank you
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]