Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-11 Thread via GitHub


Lee-W merged PR #44639:
URL: https://github.com/apache/airflow/pull/44639


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-10 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1879480154


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):
 """
 Lazy mapping of outlet asset event accessors.
 
 :meta private:
 """
 
 def __init__(self) -> None:
-self._dict: dict[str, OutletEventAccessor] = {}
+self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {}
 
 def __str__(self) -> str:
 return f"OutletEventAccessors(_dict={self._dict})"
 
-def __iter__(self) -> Iterator[str]:
-return iter(self._dict)
+def __iter__(self) -> Iterator[Asset | AssetAlias]:
+return (
+key.to_asset() if isinstance(key, AssetUniqueKey) else 
key.to_asset_alias() for key in self._dict
+)
 
 def __len__(self) -> int:
 return len(self._dict)
 
-def __getitem__(self, key: str | Asset | AssetAlias) -> 
OutletEventAccessor:
-event_key = extract_event_key(key)
-if event_key not in self._dict:
-self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key)
-return self._dict[event_key]
+def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor:
+hashable_key: BaseAssetUniqueKey
+if isinstance(key, Asset):
+hashable_key = AssetUniqueKey.from_asset(key)
+elif isinstance(key, AssetAlias):
+hashable_key = AssetAliasUniqueKey.from_asset_alias(key)
+else:
+raise KeyError("Key should be either an asset or an asset alias")

Review Comment:
   Addressed. will merge this PR once CI passed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-10 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1879425317


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):
 """
 Lazy mapping of outlet asset event accessors.
 
 :meta private:
 """
 
 def __init__(self) -> None:
-self._dict: dict[str, OutletEventAccessor] = {}
+self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {}
 
 def __str__(self) -> str:
 return f"OutletEventAccessors(_dict={self._dict})"
 
-def __iter__(self) -> Iterator[str]:
-return iter(self._dict)
+def __iter__(self) -> Iterator[Asset | AssetAlias]:
+return (
+key.to_asset() if isinstance(key, AssetUniqueKey) else 
key.to_asset_alias() for key in self._dict
+)
 
 def __len__(self) -> int:
 return len(self._dict)
 
-def __getitem__(self, key: str | Asset | AssetAlias) -> 
OutletEventAccessor:
-event_key = extract_event_key(key)
-if event_key not in self._dict:
-self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key)
-return self._dict[event_key]
+def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor:
+hashable_key: BaseAssetUniqueKey
+if isinstance(key, Asset):
+hashable_key = AssetUniqueKey.from_asset(key)
+elif isinstance(key, AssetAlias):
+hashable_key = AssetAliasUniqueKey.from_asset_alias(key)
+else:
+raise KeyError("Key should be either an asset or an asset alias")

Review Comment:
   I think Python containers generally raise TypeError for this.



##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):
 """
 Lazy mapping of outlet asset event accessors.
 
 :meta private:
 """
 
 def __init__(self) -> None:
-self._dict: dict[str, OutletEventAccessor] = {}
+self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {}
 
 def __str__(self) -> str:
 return f"OutletEventAccessors(_dict={self._dict})"
 
-def __i

Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-10 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1879428172


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):

Review Comment:
   Sounds good. We should also support AssetRef.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-10 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1879430227


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):

Review Comment:
   Yep, track it here https://github.com/apache/airflow/issues/44834



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-10 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1879427933


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):
 """
 Lazy mapping of outlet asset event accessors.
 
 :meta private:
 """
 
 def __init__(self) -> None:
-self._dict: dict[str, OutletEventAccessor] = {}
+self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {}
 
 def __str__(self) -> str:
 return f"OutletEventAccessors(_dict={self._dict})"
 
-def __iter__(self) -> Iterator[str]:
-return iter(self._dict)
+def __iter__(self) -> Iterator[Asset | AssetAlias]:
+return (
+key.to_asset() if isinstance(key, AssetUniqueKey) else 
key.to_asset_alias() for key in self._dict
+)
 
 def __len__(self) -> int:
 return len(self._dict)
 
-def __getitem__(self, key: str | Asset | AssetAlias) -> 
OutletEventAccessor:
-event_key = extract_event_key(key)
-if event_key not in self._dict:
-self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key)
-return self._dict[event_key]
+def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor:
+hashable_key: BaseAssetUniqueKey
+if isinstance(key, Asset):
+hashable_key = AssetUniqueKey.from_asset(key)
+elif isinstance(key, AssetAlias):
+hashable_key = AssetAliasUniqueKey.from_asset_alias(key)
+else:
+raise KeyError("Key should be either an asset or an asset alias")

Review Comment:
   yep, will update it. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-10 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1879427454


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):

Review Comment:
   we probably should. but I think it might be better if we support it in the 
next PR. the purpose of this PR is adding `Asset.name` support to existing 
feature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-10 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1879426240


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], 
OutletEventAccessor]):

Review Comment:
   I wonder if we should support int here… Not important either way, we can do 
it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1877175572


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_asset(self) -> Asset:
+return Asset(name=self.name, uri=self.uri)
+
+
+class AssetAliasUniqueKey(NamedTuple):
+name: str
+
+@staticmethod
+def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey:
+return AssetAliasUniqueKey(name=asset_alias.name)
+
+def to_asset_alias(self) -> AssetAlias:
+return AssetAlias(name=self.name)
+
+
+BaseAssetUniqueKey = Union[AssetUniqueKey, AssetAliasUniqueKey]

Review Comment:
   changed it as an attrs class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1877031423


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_asset(self) -> Asset:
+return Asset(name=self.name, uri=self.uri)
+
+
+class AssetAliasUniqueKey(NamedTuple):
+name: str
+
+@staticmethod
+def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey:
+return AssetAliasUniqueKey(name=asset_alias.name)
+
+def to_asset_alias(self) -> AssetAlias:
+return AssetAlias(name=self.name)
+
+
+BaseAssetUniqueKey = Union[AssetUniqueKey, AssetAliasUniqueKey]

Review Comment:
   Yep, this is what I expected (for easier serialization). But after a second 
thought, we probably could make it a class so that we can avoid the need to 
handle migration in the future (e.g., serialized named tuple key to something 
with identity check)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1876732671


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_asset(self) -> Asset:
+return Asset(name=self.name, uri=self.uri)
+
+
+class AssetAliasUniqueKey(NamedTuple):
+name: str
+
+@staticmethod
+def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey:
+return AssetAliasUniqueKey(name=asset_alias.name)
+
+def to_asset_alias(self) -> AssetAlias:
+return AssetAlias(name=self.name)
+
+
+BaseAssetUniqueKey = Union[AssetUniqueKey, AssetAliasUniqueKey]

Review Comment:
   Just a by-the-way, not sure if you already know—these two classes aren’t 
actually distinctive due to how nametuples in Python are still fundamentally 
tuples. Specifically,
   
   ```pycon
   >>> from collections import namedtuple
   >>> A = namedtuple("A", "x y")
   >>> B = namedtuple("B", "x y")
   >>> A(1, 2) == B(1, 2)
   True
   ```
   
   The only reason this approach currently works is AssetUniqueKey and 
AssetAliasUniqueKey have different number of members (2 vs 1). They might as 
well be free functions returning tuples. Nothing we must change here though (it 
works after all); we can always fix this later if it fails to work in the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1876229094


##
airflow/utils/operator_helpers.py:
##
@@ -276,10 +277,14 @@ def _run():
 
 for metadata in _run():
 if isinstance(metadata, Metadata):
-outlet_events[metadata.uri].extra.update(metadata.extra)
+# TODO: handle asset name
+
outlet_events[Asset(uri=metadata.uri)].extra.update(metadata.extra)
 
 if metadata.alias_name:
-outlet_events[metadata.alias_name].add(metadata.uri, 
extra=metadata.extra)
+# TODO: handle asset name
+
outlet_events[AssetAlias(name=metadata.alias_name)].add(
+Asset(uri=metadata.uri), extra=metadata.extra
+)

Review Comment:
   remove string access and create a deprecation PR 
https://github.com/apache/airflow/pull/44791



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875661261


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:

Review Comment:
   ```suggestion
   def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
   ```
   
   Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875656227


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:

Review Comment:
   removed



##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)
+elif not isinstance(asset, Asset):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
+if isinstance(self.key, AssetAliasUniqueKey):
+asset_alias_name = self.key.name
+elif isinstance(self.key, str):
+# TODO: deprecate string access
+asset_alias_name = self.key
 else:
 return
 
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[str, BaseAsset], 
OutletEventAccessor]):

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875678174


##
airflow/utils/operator_helpers.py:
##
@@ -276,10 +277,14 @@ def _run():
 
 for metadata in _run():
 if isinstance(metadata, Metadata):
-outlet_events[metadata.uri].extra.update(metadata.extra)
+# TODO: handle asset name
+
outlet_events[Asset(uri=metadata.uri)].extra.update(metadata.extra)
 
 if metadata.alias_name:
-outlet_events[metadata.alias_name].add(metadata.uri, 
extra=metadata.extra)
+# TODO: handle asset name
+
outlet_events[AssetAlias(name=metadata.alias_name)].add(
+Asset(uri=metadata.uri), extra=metadata.extra
+)

Review Comment:
   Let’s deprecate passing in a URI to Metadata in 2.11 and require an Asset 
there as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875676469


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset | AssetAlias], 
OutletEventAccessor]):
 """
 Lazy mapping of outlet asset event accessors.
 
 :meta private:
 """
 
 def __init__(self) -> None:
-self._dict: dict[str, OutletEventAccessor] = {}
+self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {}
 
 def __str__(self) -> str:
 return f"OutletEventAccessors(_dict={self._dict})"
 
-def __iter__(self) -> Iterator[str]:
-return iter(self._dict)
+def __iter__(self) -> Iterator[Asset | AssetAlias]:
+return iter(
+key.to_asset() if isinstance(key, AssetUniqueKey) else 
key.to_asset_alias() for key in self._dict
+)

Review Comment:
   ah right, Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875673767


##
airflow/utils/context.py:
##
@@ -161,54 +176,57 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
 """Add an AssetEvent to an existing Asset."""
-if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+if not isinstance(self.key, AssetAliasUniqueKey):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
-else:
-return
-
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+asset_alias_name = self.key.name
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[Asset | AssetAlias], 
OutletEventAccessor]):
 """
 Lazy mapping of outlet asset event accessors.
 
 :meta private:
 """
 
 def __init__(self) -> None:
-self._dict: dict[str, OutletEventAccessor] = {}
+self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {}
 
 def __str__(self) -> str:
 return f"OutletEventAccessors(_dict={self._dict})"
 
-def __iter__(self) -> Iterator[str]:
-return iter(self._dict)
+def __iter__(self) -> Iterator[Asset | AssetAlias]:
+return iter(
+key.to_asset() if isinstance(key, AssetUniqueKey) else 
key.to_asset_alias() for key in self._dict
+)

Review Comment:
   ```suggestion
   return (
   key.to_asset() if isinstance(key, AssetUniqueKey) else 
key.to_asset_alias()
   for key in self._dict
   )
   ```
   
   This is already an interator, no need to call `iter()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875670102


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)

Review Comment:
   ah sorry, I think the one you mentioned is an outdated code. not sure why it 
was still there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875668352


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_obj(self) -> Asset:

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875662630


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)

Review Comment:
   Is this a good idea? We can’t be sure if the user intends this to be a name 
or URI. Maybe we shouldn’t allow it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875661261


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:

Review Comment:
   ```suggestion
   def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None:
   ```
   
   Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875656645


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)
+elif not isinstance(asset, Asset):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
+if isinstance(self.key, AssetAliasUniqueKey):
+asset_alias_name = self.key.name
+elif isinstance(self.key, str):
+# TODO: deprecate string access
+asset_alias_name = self.key

Review Comment:
   this has been removed as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875609015


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:

Review Comment:
   I think we can just remove them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550638


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_obj(self) -> Asset:

Review Comment:
   ~~Yep, this is one name I kinda struggle with 🤦‍♂️ I can use `from_obj` and 
`to_obj` for now till we have a better name. I'm not sure whether users need 
this function. Could we probably mark it private as well?~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r187591


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_obj(self) -> Asset:

Review Comment:
   > Or maybe we should just call these to_asset and to_asset_alias. We really 
only need polymorphism in one place (the outlet event accessor); it can be 
pretty easily avoided with an if-else condition.
   
   Yep, it would work and that's the main reason I name it this way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550638


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_obj(self) -> Asset:

Review Comment:
   Yep, this is one name I kinda struggle with 🤦‍♂️ I can use `from_obj` and 
`to_obj` for now till we have a better name. I'm not sure whether users need 
this function. Could we probably mark it private as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550431


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_obj(self) -> Asset:

Review Comment:
   Or maybe we should just call these `to_asset` and `to_asset_alias`. We 
really only need polymorphism in one place (the outlet event accessor); it can 
be pretty easily avoided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550431


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_obj(self) -> Asset:

Review Comment:
   Or maybe we should just call these `to_asset` and `to_asset_alias`. We 
really only need polymorphism in one place (the outlet event accessor); it can 
be pretty easily avoided with an if-else condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875547997


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)
+elif not isinstance(asset, Asset):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
+if isinstance(self.key, AssetAliasUniqueKey):
+asset_alias_name = self.key.name
+elif isinstance(self.key, str):
+# TODO: deprecate string access
+asset_alias_name = self.key
 else:
 return
 
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[str, BaseAsset], 
OutletEventAccessor]):

Review Comment:
   Sounds good 👍 will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875546737


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple):
 uri: str
 
 @staticmethod
-def from_asset(asset: Asset) -> AssetUniqueKey:
+def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey:
 return AssetUniqueKey(name=asset.name, uri=asset.uri)
 
+def to_obj(self) -> Asset:

Review Comment:
   I wonder if there’s a better name than `obj`… Python doesn’t seem to have an 
established convention for this. (Rust has `into()`, for example.)
   
   Also maybe the `from` part should also be `from_obj`? Not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875547133


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:

Review Comment:
   I think we already have a deprecation warning for this one 
https://github.com/apache/airflow/blob/99e713efa398bef9d76eb4e8145538e828a05720/airflow/utils/context.py#L180
   
   could I just remove all these string handling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875546378


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)
+elif not isinstance(asset, Asset):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
+if isinstance(self.key, AssetAliasUniqueKey):
+asset_alias_name = self.key.name
+elif isinstance(self.key, str):
+# TODO: deprecate string access
+asset_alias_name = self.key

Review Comment:
   Yep, 
https://github.com/apache/airflow/blob/99e713efa398bef9d76eb4e8145538e828a05720/airflow/utils/context.py#L227
 has already deprecate it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875544423


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)
+elif not isinstance(asset, Asset):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
+if isinstance(self.key, AssetAliasUniqueKey):
+asset_alias_name = self.key.name
+elif isinstance(self.key, str):
+# TODO: deprecate string access
+asset_alias_name = self.key
 else:
 return
 
-event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {})
+event = AssetAliasEvent(
+source_alias_name=asset_alias_name,
+dest_asset_key=AssetUniqueKey.from_asset(asset),
+extra=extra or {},
+)
 self.asset_alias_events.append(event)
 
 
-class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+class OutletEventAccessors(Mapping[Union[str, BaseAsset], 
OutletEventAccessor]):

Review Comment:
   This should be something like `Asset | AssetAlias` instead of `BaseAsset` 
since not all BaseAsset subclasses make sense here (for example it’d be awkward 
if the user passes in an AssetAny).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


Lee-W commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875543526


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:

Review Comment:
   The main reason for keeping it is 
https://github.com/apache/airflow/issues/43956#issuecomment-2511443068. It 
supposed to be removed eventually



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875542390


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:
 """Add an AssetEvent to an existing Asset."""
 if isinstance(asset, str):
-asset_uri = asset
-elif isinstance(asset, Asset):
-asset_uri = asset.uri
-else:
+asset = Asset(asset)
+elif not isinstance(asset, Asset):
 return
 
-if isinstance(self.raw_key, str):
-asset_alias_name = self.raw_key
-elif isinstance(self.raw_key, AssetAlias):
-asset_alias_name = self.raw_key.name
+if isinstance(self.key, AssetAliasUniqueKey):
+asset_alias_name = self.key.name
+elif isinstance(self.key, str):
+# TODO: deprecate string access
+asset_alias_name = self.key

Review Comment:
   We should remove this in 3.0 and deprecate in 2.11.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]

2024-12-09 Thread via GitHub


uranusjr commented on code in PR #44639:
URL: https://github.com/apache/airflow/pull/44639#discussion_r1875541501


##
airflow/utils/context.py:
##
@@ -161,54 +178,89 @@ class OutletEventAccessor:
 :meta private:
 """
 
-raw_key: str | Asset | AssetAlias
+key: str | BaseAssetUniqueKey
 extra: dict[str, Any] = attrs.Factory(dict)
 asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> 
None:
+def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> 
None:

Review Comment:
   Not sure if we want to support str here, based on the same reasoning. Maybe 
this should simply accept only Asset. (We can add AssetRef support later.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org