Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W merged PR #44639: URL: https://github.com/apache/airflow/pull/44639 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1879480154 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): """ Lazy mapping of outlet asset event accessors. :meta private: """ def __init__(self) -> None: -self._dict: dict[str, OutletEventAccessor] = {} +self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {} def __str__(self) -> str: return f"OutletEventAccessors(_dict={self._dict})" -def __iter__(self) -> Iterator[str]: -return iter(self._dict) +def __iter__(self) -> Iterator[Asset | AssetAlias]: +return ( +key.to_asset() if isinstance(key, AssetUniqueKey) else key.to_asset_alias() for key in self._dict +) def __len__(self) -> int: return len(self._dict) -def __getitem__(self, key: str | Asset | AssetAlias) -> OutletEventAccessor: -event_key = extract_event_key(key) -if event_key not in self._dict: -self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key) -return self._dict[event_key] +def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor: +hashable_key: BaseAssetUniqueKey +if isinstance(key, Asset): +hashable_key = AssetUniqueKey.from_asset(key) +elif isinstance(key, AssetAlias): +hashable_key = AssetAliasUniqueKey.from_asset_alias(key) +else: +raise KeyError("Key should be either an asset or an asset alias") Review Comment: Addressed. will merge this PR once CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1879425317 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): """ Lazy mapping of outlet asset event accessors. :meta private: """ def __init__(self) -> None: -self._dict: dict[str, OutletEventAccessor] = {} +self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {} def __str__(self) -> str: return f"OutletEventAccessors(_dict={self._dict})" -def __iter__(self) -> Iterator[str]: -return iter(self._dict) +def __iter__(self) -> Iterator[Asset | AssetAlias]: +return ( +key.to_asset() if isinstance(key, AssetUniqueKey) else key.to_asset_alias() for key in self._dict +) def __len__(self) -> int: return len(self._dict) -def __getitem__(self, key: str | Asset | AssetAlias) -> OutletEventAccessor: -event_key = extract_event_key(key) -if event_key not in self._dict: -self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key) -return self._dict[event_key] +def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor: +hashable_key: BaseAssetUniqueKey +if isinstance(key, Asset): +hashable_key = AssetUniqueKey.from_asset(key) +elif isinstance(key, AssetAlias): +hashable_key = AssetAliasUniqueKey.from_asset_alias(key) +else: +raise KeyError("Key should be either an asset or an asset alias") Review Comment: I think Python containers generally raise TypeError for this. ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): """ Lazy mapping of outlet asset event accessors. :meta private: """ def __init__(self) -> None: -self._dict: dict[str, OutletEventAccessor] = {} +self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {} def __str__(self) -> str: return f"OutletEventAccessors(_dict={self._dict})" -def __i
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1879428172 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): Review Comment: Sounds good. We should also support AssetRef. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1879430227 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): Review Comment: Yep, track it here https://github.com/apache/airflow/issues/44834 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1879427933 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): """ Lazy mapping of outlet asset event accessors. :meta private: """ def __init__(self) -> None: -self._dict: dict[str, OutletEventAccessor] = {} +self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {} def __str__(self) -> str: return f"OutletEventAccessors(_dict={self._dict})" -def __iter__(self) -> Iterator[str]: -return iter(self._dict) +def __iter__(self) -> Iterator[Asset | AssetAlias]: +return ( +key.to_asset() if isinstance(key, AssetUniqueKey) else key.to_asset_alias() for key in self._dict +) def __len__(self) -> int: return len(self._dict) -def __getitem__(self, key: str | Asset | AssetAlias) -> OutletEventAccessor: -event_key = extract_event_key(key) -if event_key not in self._dict: -self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key) -return self._dict[event_key] +def __getitem__(self, key: Asset | AssetAlias) -> OutletEventAccessor: +hashable_key: BaseAssetUniqueKey +if isinstance(key, Asset): +hashable_key = AssetUniqueKey.from_asset(key) +elif isinstance(key, AssetAlias): +hashable_key = AssetAliasUniqueKey.from_asset_alias(key) +else: +raise KeyError("Key should be either an asset or an asset alias") Review Comment: yep, will update it. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1879427454 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): Review Comment: we probably should. but I think it might be better if we support it in the next PR. the purpose of this PR is adding `Asset.name` support to existing feature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1879426240 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset, AssetAlias], OutletEventAccessor]): Review Comment: I wonder if we should support int here… Not important either way, we can do it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1877175572 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_asset(self) -> Asset: +return Asset(name=self.name, uri=self.uri) + + +class AssetAliasUniqueKey(NamedTuple): +name: str + +@staticmethod +def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey: +return AssetAliasUniqueKey(name=asset_alias.name) + +def to_asset_alias(self) -> AssetAlias: +return AssetAlias(name=self.name) + + +BaseAssetUniqueKey = Union[AssetUniqueKey, AssetAliasUniqueKey] Review Comment: changed it as an attrs class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1877031423 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_asset(self) -> Asset: +return Asset(name=self.name, uri=self.uri) + + +class AssetAliasUniqueKey(NamedTuple): +name: str + +@staticmethod +def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey: +return AssetAliasUniqueKey(name=asset_alias.name) + +def to_asset_alias(self) -> AssetAlias: +return AssetAlias(name=self.name) + + +BaseAssetUniqueKey = Union[AssetUniqueKey, AssetAliasUniqueKey] Review Comment: Yep, this is what I expected (for easier serialization). But after a second thought, we probably could make it a class so that we can avoid the need to handle migration in the future (e.g., serialized named tuple key to something with identity check) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1876732671 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_asset(self) -> Asset: +return Asset(name=self.name, uri=self.uri) + + +class AssetAliasUniqueKey(NamedTuple): +name: str + +@staticmethod +def from_asset_alias(asset_alias: AssetAlias) -> AssetAliasUniqueKey: +return AssetAliasUniqueKey(name=asset_alias.name) + +def to_asset_alias(self) -> AssetAlias: +return AssetAlias(name=self.name) + + +BaseAssetUniqueKey = Union[AssetUniqueKey, AssetAliasUniqueKey] Review Comment: Just a by-the-way, not sure if you already know—these two classes aren’t actually distinctive due to how nametuples in Python are still fundamentally tuples. Specifically, ```pycon >>> from collections import namedtuple >>> A = namedtuple("A", "x y") >>> B = namedtuple("B", "x y") >>> A(1, 2) == B(1, 2) True ``` The only reason this approach currently works is AssetUniqueKey and AssetAliasUniqueKey have different number of members (2 vs 1). They might as well be free functions returning tuples. Nothing we must change here though (it works after all); we can always fix this later if it fails to work in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1876229094 ## airflow/utils/operator_helpers.py: ## @@ -276,10 +277,14 @@ def _run(): for metadata in _run(): if isinstance(metadata, Metadata): -outlet_events[metadata.uri].extra.update(metadata.extra) +# TODO: handle asset name + outlet_events[Asset(uri=metadata.uri)].extra.update(metadata.extra) if metadata.alias_name: -outlet_events[metadata.alias_name].add(metadata.uri, extra=metadata.extra) +# TODO: handle asset name + outlet_events[AssetAlias(name=metadata.alias_name)].add( +Asset(uri=metadata.uri), extra=metadata.extra +) Review Comment: remove string access and create a deprecation PR https://github.com/apache/airflow/pull/44791 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875661261 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: Review Comment: ```suggestion def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: ``` Right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875656227 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: Review Comment: removed ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) +elif not isinstance(asset, Asset): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name +if isinstance(self.key, AssetAliasUniqueKey): +asset_alias_name = self.key.name +elif isinstance(self.key, str): +# TODO: deprecate string access +asset_alias_name = self.key else: return -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[str, BaseAsset], OutletEventAccessor]): Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875678174 ## airflow/utils/operator_helpers.py: ## @@ -276,10 +277,14 @@ def _run(): for metadata in _run(): if isinstance(metadata, Metadata): -outlet_events[metadata.uri].extra.update(metadata.extra) +# TODO: handle asset name + outlet_events[Asset(uri=metadata.uri)].extra.update(metadata.extra) if metadata.alias_name: -outlet_events[metadata.alias_name].add(metadata.uri, extra=metadata.extra) +# TODO: handle asset name + outlet_events[AssetAlias(name=metadata.alias_name)].add( +Asset(uri=metadata.uri), extra=metadata.extra +) Review Comment: Let’s deprecate passing in a URI to Metadata in 2.11 and require an Asset there as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875676469 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset | AssetAlias], OutletEventAccessor]): """ Lazy mapping of outlet asset event accessors. :meta private: """ def __init__(self) -> None: -self._dict: dict[str, OutletEventAccessor] = {} +self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {} def __str__(self) -> str: return f"OutletEventAccessors(_dict={self._dict})" -def __iter__(self) -> Iterator[str]: -return iter(self._dict) +def __iter__(self) -> Iterator[Asset | AssetAlias]: +return iter( +key.to_asset() if isinstance(key, AssetUniqueKey) else key.to_asset_alias() for key in self._dict +) Review Comment: ah right, Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875673767 ## airflow/utils/context.py: ## @@ -161,54 +176,57 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" -if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +if not isinstance(self.key, AssetAliasUniqueKey): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name -else: -return - -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +asset_alias_name = self.key.name +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[Asset | AssetAlias], OutletEventAccessor]): """ Lazy mapping of outlet asset event accessors. :meta private: """ def __init__(self) -> None: -self._dict: dict[str, OutletEventAccessor] = {} +self._dict: dict[BaseAssetUniqueKey, OutletEventAccessor] = {} def __str__(self) -> str: return f"OutletEventAccessors(_dict={self._dict})" -def __iter__(self) -> Iterator[str]: -return iter(self._dict) +def __iter__(self) -> Iterator[Asset | AssetAlias]: +return iter( +key.to_asset() if isinstance(key, AssetUniqueKey) else key.to_asset_alias() for key in self._dict +) Review Comment: ```suggestion return ( key.to_asset() if isinstance(key, AssetUniqueKey) else key.to_asset_alias() for key in self._dict ) ``` This is already an interator, no need to call `iter()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875670102 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) Review Comment: ah sorry, I think the one you mentioned is an outdated code. not sure why it was still there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875668352 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_obj(self) -> Asset: Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875662630 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) Review Comment: Is this a good idea? We can’t be sure if the user intends this to be a name or URI. Maybe we shouldn’t allow it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875661261 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: Review Comment: ```suggestion def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: ``` Right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875656645 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) +elif not isinstance(asset, Asset): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name +if isinstance(self.key, AssetAliasUniqueKey): +asset_alias_name = self.key.name +elif isinstance(self.key, str): +# TODO: deprecate string access +asset_alias_name = self.key Review Comment: this has been removed as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875609015 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: Review Comment: I think we can just remove them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550638 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_obj(self) -> Asset: Review Comment: ~~Yep, this is one name I kinda struggle with 🤦♂️ I can use `from_obj` and `to_obj` for now till we have a better name. I'm not sure whether users need this function. Could we probably mark it private as well?~~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r187591 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_obj(self) -> Asset: Review Comment: > Or maybe we should just call these to_asset and to_asset_alias. We really only need polymorphism in one place (the outlet event accessor); it can be pretty easily avoided with an if-else condition. Yep, it would work and that's the main reason I name it this way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550638 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_obj(self) -> Asset: Review Comment: Yep, this is one name I kinda struggle with 🤦♂️ I can use `from_obj` and `to_obj` for now till we have a better name. I'm not sure whether users need this function. Could we probably mark it private as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550431 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_obj(self) -> Asset: Review Comment: Or maybe we should just call these `to_asset` and `to_asset_alias`. We really only need polymorphism in one place (the outlet event accessor); it can be pretty easily avoided. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875550431 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_obj(self) -> Asset: Review Comment: Or maybe we should just call these `to_asset` and `to_asset_alias`. We really only need polymorphism in one place (the outlet event accessor); it can be pretty easily avoided with an if-else condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875547997 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) +elif not isinstance(asset, Asset): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name +if isinstance(self.key, AssetAliasUniqueKey): +asset_alias_name = self.key.name +elif isinstance(self.key, str): +# TODO: deprecate string access +asset_alias_name = self.key else: return -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[str, BaseAsset], OutletEventAccessor]): Review Comment: Sounds good 👍 will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875546737 ## task_sdk/src/airflow/sdk/definitions/asset/__init__.py: ## @@ -54,9 +55,26 @@ class AssetUniqueKey(NamedTuple): uri: str @staticmethod -def from_asset(asset: Asset) -> AssetUniqueKey: +def from_asset(asset: Asset | AssetModel) -> AssetUniqueKey: return AssetUniqueKey(name=asset.name, uri=asset.uri) +def to_obj(self) -> Asset: Review Comment: I wonder if there’s a better name than `obj`… Python doesn’t seem to have an established convention for this. (Rust has `into()`, for example.) Also maybe the `from` part should also be `from_obj`? Not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875547133 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: Review Comment: I think we already have a deprecation warning for this one https://github.com/apache/airflow/blob/99e713efa398bef9d76eb4e8145538e828a05720/airflow/utils/context.py#L180 could I just remove all these string handling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875546378 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) +elif not isinstance(asset, Asset): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name +if isinstance(self.key, AssetAliasUniqueKey): +asset_alias_name = self.key.name +elif isinstance(self.key, str): +# TODO: deprecate string access +asset_alias_name = self.key Review Comment: Yep, https://github.com/apache/airflow/blob/99e713efa398bef9d76eb4e8145538e828a05720/airflow/utils/context.py#L227 has already deprecate it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875544423 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) +elif not isinstance(asset, Asset): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name +if isinstance(self.key, AssetAliasUniqueKey): +asset_alias_name = self.key.name +elif isinstance(self.key, str): +# TODO: deprecate string access +asset_alias_name = self.key else: return -event = AssetAliasEvent(asset_alias_name, asset_uri, extra=extra or {}) +event = AssetAliasEvent( +source_alias_name=asset_alias_name, +dest_asset_key=AssetUniqueKey.from_asset(asset), +extra=extra or {}, +) self.asset_alias_events.append(event) -class OutletEventAccessors(Mapping[str, OutletEventAccessor]): +class OutletEventAccessors(Mapping[Union[str, BaseAsset], OutletEventAccessor]): Review Comment: This should be something like `Asset | AssetAlias` instead of `BaseAsset` since not all BaseAsset subclasses make sense here (for example it’d be awkward if the user passes in an AssetAny). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
Lee-W commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875543526 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: Review Comment: The main reason for keeping it is https://github.com/apache/airflow/issues/43956#issuecomment-2511443068. It supposed to be removed eventually -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875542390 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: """Add an AssetEvent to an existing Asset.""" if isinstance(asset, str): -asset_uri = asset -elif isinstance(asset, Asset): -asset_uri = asset.uri -else: +asset = Asset(asset) +elif not isinstance(asset, Asset): return -if isinstance(self.raw_key, str): -asset_alias_name = self.raw_key -elif isinstance(self.raw_key, AssetAlias): -asset_alias_name = self.raw_key.name +if isinstance(self.key, AssetAliasUniqueKey): +asset_alias_name = self.key.name +elif isinstance(self.key, str): +# TODO: deprecate string access +asset_alias_name = self.key Review Comment: We should remove this in 3.0 and deprecate in 2.11. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Respect Asset.name when accessing inlet and outlet events [airflow]
uranusjr commented on code in PR #44639: URL: https://github.com/apache/airflow/pull/44639#discussion_r1875541501 ## airflow/utils/context.py: ## @@ -161,54 +178,89 @@ class OutletEventAccessor: :meta private: """ -raw_key: str | Asset | AssetAlias +key: str | BaseAssetUniqueKey extra: dict[str, Any] = attrs.Factory(dict) asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list) -def add(self, asset: Asset | str, extra: dict[str, Any] | None = None) -> None: +def add(self, asset: str | Asset, extra: dict[str, Any] | None = None) -> None: Review Comment: Not sure if we want to support str here, based on the same reasoning. Maybe this should simply accept only Asset. (We can add AssetRef support later.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org