Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-31 Thread via GitHub


vincbeck merged PR #45562:
URL: https://github.com/apache/airflow/pull/45562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-31 Thread via GitHub


vincbeck commented on PR #45562:
URL: https://github.com/apache/airflow/pull/45562#issuecomment-2628587999

   > All good, thanks @vincbeck
   
   Thank you for thorough review! I like very much the changes made thanks to 
your feedbacks ans @uranusjr's as well!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-31 Thread via GitHub


ashb commented on PR #45562:
URL: https://github.com/apache/airflow/pull/45562#issuecomment-2627074548

   All good, thanks @vincbeck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-30 Thread via GitHub


vincbeck commented on PR #45562:
URL: https://github.com/apache/airflow/pull/45562#issuecomment-2625637538

   @ashb Are you okay with these changes? I'll make the changes related to the 
discussion we had on Slack (creating the interface `BaseEventTrigger`) in a 
separate PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-29 Thread via GitHub


vincbeck commented on PR #45562:
URL: https://github.com/apache/airflow/pull/45562#issuecomment-2622616228

   Other questions/concerns? :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-29 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1933555018


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -36,6 +36,7 @@
 from sqlalchemy.orm import Session
 
 from airflow.models.asset import AssetModel
+from airflow.serialization.serialized_objects import SerializedAssetWatcher

Review Comment:
   Oh, under type checking I missed that. It's fine for now then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-28 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1932404499


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -36,6 +36,7 @@
 from sqlalchemy.orm import Session
 
 from airflow.models.asset import AssetModel
+from airflow.serialization.serialized_objects import SerializedAssetWatcher

Review Comment:
   It is under `TYPE_CHECKING`, that should prevent dependency cycles? I am not 
familiar with protocols in Python but if you think that'd be better, why not 
indeed. But should we do that in another PR? So far there was no dependency 
cycles, what makes you think we should have some later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-28 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1932397517


##
task_sdk/src/airflow/sdk/__init__.py:
##
@@ -31,11 +31,15 @@
 "dag",
 "get_current_context",
 "get_parsing_context",
+"__version__",
+"Asset",
+"AssetWatcher",

Review Comment:
   Done :) I agree on the ruff rule, we can do that in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-28 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1931986671


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -36,6 +36,7 @@
 from sqlalchemy.orm import Session
 
 from airflow.models.asset import AssetModel
+from airflow.serialization.serialized_objects import SerializedAssetWatcher

Review Comment:
   Importing this here is going to cause us dependency cycles.
   
   I think instead of importing this class we should define a Protocol and then 
`watchers: list[AssetWatcherProtocol]` if we can?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-28 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1931984722


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -251,6 +253,19 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
 raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+"""A representation of an asset watcher. The name uniquely identity the 
watch."""

Review Comment:
   ```suggestion
   """A representation of an asset watcher. The name uniquely identifies 
the watch."""
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-28 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1931984062


##
task_sdk/src/airflow/sdk/__init__.py:
##
@@ -31,11 +31,15 @@
 "dag",
 "get_current_context",
 "get_parsing_context",
+"__version__",
+"Asset",
+"AssetWatcher",

Review Comment:
   Nit: `__version__` is duplicated, and can you sort this list.
   
   And we should probably enable 
https://docs.astral.sh/ruff/rules/unsorted-dunder-all/#unsorted-dunder-all-ruf022
 to have ruff enforce and sort it for us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1926040582


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -257,6 +258,17 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
 raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+"""A representation of an asset watcher. The name uniquely identity the 
watch."""
+
+name: str
+# This attribute serves double purpose. For a "normal" asset instance
+# loaded from DAG, this holds the trigger used to monitor an external 
resource.
+# For an asset recreated from a serialized DAG, however, this holds the 
serialized data of the trigger.

Review Comment:
   You are :) A lot! Thanks a lot for the details, that helps a lot to better 
understand. I created a new class `SerializedAssetWatcher` and use that class 
whenever I deserialize. In `collection.py`, when I fetch the watchers, I cast 
them as `SerializedAssetWatcher` because `asset.watchers` can be `AssetWatcher` 
or `SerializedAssetWatcher`. Let me know if this is what you had in mind :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925918061


##
airflow/serialization/serialized_objects.py:
##
@@ -283,7 +305,17 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 """
 dat = var["__type"]
 if dat == DAT.ASSET:
-return Asset(name=var["name"], uri=var["uri"], group=var["group"], 
extra=var["extra"])
+serialized_watchers = var["watchers"] if "watchers" in var else []
+return Asset(
+name=var["name"],
+uri=var["uri"],
+group=var["group"],
+extra=var["extra"],

Review Comment:
   I factorized the code in a new function `decode_asset`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925781074


##
airflow/serialization/serialized_objects.py:
##
@@ -283,7 +305,17 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 """
 dat = var["__type"]
 if dat == DAT.ASSET:
-return Asset(name=var["name"], uri=var["uri"], group=var["group"], 
extra=var["extra"])
+serialized_watchers = var["watchers"] if "watchers" in var else []
+return Asset(
+name=var["name"],
+uri=var["uri"],
+group=var["group"],
+extra=var["extra"],

Review Comment:
   I applied this one but reverted because `var` contains some keys we do not 
want to pass to `Asset`. See error 
[here](https://github.com/apache/airflow/actions/runs/12914159501/job/36013477933?pr=45562)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925761298


##
airflow/serialization/serialized_objects.py:
##
@@ -283,7 +305,17 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 """
 dat = var["__type"]
 if dat == DAT.ASSET:
-return Asset(name=var["name"], uri=var["uri"], group=var["group"], 
extra=var["extra"])
+serialized_watchers = var["watchers"] if "watchers" in var else []
+return Asset(
+name=var["name"],
+uri=var["uri"],
+group=var["group"],
+extra=var["extra"],

Review Comment:
   I don't mind two versions per se, but it'd be nice if they had the same 
implementation šŸ˜ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925760803


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -257,6 +258,17 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
 raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+"""A representation of an asset watcher. The name uniquely identity the 
watch."""
+
+name: str
+# This attribute serves double purpose. For a "normal" asset instance
+# loaded from DAG, this holds the trigger used to monitor an external 
resource.
+# For an asset recreated from a serialized DAG, however, this holds the 
serialized data of the trigger.

Review Comment:
   > Do you want to use it only by users? I am trying to understand the path 
forward :)
   
   That is my end goal yes, and Asset would need updating too. Maybe it's best 
all done at once. For instance on a SchedulerDag object or a 
SchedulerBaseOperator (not the final names) the interface will be greatly 
reduced, and contain only the minimal needed to set relationships between 
tasks, but a lot of the "richness" of API and compat would not be needed in 
that class.
   
   But it's probably more confusing to have one class be like that but not 
others.
   
   That said, one thing you _might_ be able to do here is have a 
`SerializedAssetWatcher` class which is a subclass of this one with this field 
as a different type? This pattern is what we have already for 
SerializedBaseOperator -- i.e. derserialize code should create instances of 
`SerializedAssetWatcher` not `AssetWatcher`.
   
   If that works with mypy (it might complain about the type of the field not 
matching the parent) I think this approach is a small change here but makes my 
life much easier on this bigger refactor when I get to it.
   
   Am I making any sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925735076


##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -257,6 +258,17 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
 raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+"""A representation of an asset watcher. The name uniquely identity the 
watch."""
+
+name: str
+# This attribute serves double purpose. For a "normal" asset instance
+# loaded from DAG, this holds the trigger used to monitor an external 
resource.
+# For an asset recreated from a serialized DAG, however, this holds the 
serialized data of the trigger.

Review Comment:
   Are you suggesting we should not use this class when we deserialize the DAG? 
Currently this class is used by users when defining their DAGs and by the 
scheduler when it deserializes the DAG from the DB. Do you want to use it only 
by users? I am trying to understand the path forward :)
   
   Do not we have the same problem with classes like `Asset`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925654427


##
airflow/dag_processing/collection.py:
##
@@ -737,16 +735,19 @@ def add_asset_trigger_references(
 # Update references from assets being used
 refs_to_add: dict[tuple[str, str], set[int]] = {}
 refs_to_remove: dict[tuple[str, str], set[int]] = {}
-triggers: dict[int, BaseTrigger] = {}
+triggers: dict[int, dict] = {}
 
 # Optimization: if no asset collected, skip fetching active assets
 active_assets = _find_active_assets(self.assets.keys(), 
session=session) if self.assets else {}
 
 for name_uri, asset in self.assets.items():
 # If the asset belong to a DAG not active or paused, consider 
there is no watcher associated to it
-asset_watchers = asset.watchers if name_uri in active_assets else 
[]
-trigger_hash_to_trigger_dict: dict[int, BaseTrigger] = {
-self._get_base_trigger_hash(trigger): trigger for trigger in 
asset_watchers
+asset_watchers: list[AssetWatcher] = asset.watchers if name_uri in 
active_assets else []
+trigger_hash_to_trigger_dict: dict[int, dict] = {
+self._get_trigger_hash(
+cast(dict, watcher.trigger)["classpath"], cast(dict, 
watcher.trigger)["kwargs"]
+): cast(dict, watcher.trigger)

Review Comment:
   The `trigger` property in `AssetWatcher` can be a `BaseTrigger` or a `dict` 
depending on where the object AssetWatcher is created:
   - In the DAG definition (by users), `trigger` is a `BaseTrigger`
   - When deserializing the DAG from the DB, `trigger` is a `dict` to avoid 
creating the `AssetWatcher` as part of the deserialization process (based on 
your comment and suggestion by TP 
[here](https://github.com/apache/airflow/pull/45562#discussion_r1917646149))
   
   Here I cast it to make Mypy happy



##
airflow/example_dags/example_asset_with_watchers.py:
##
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher
 
-file_path = tempfile.NamedTemporaryFile().name
+file_path = "/tmp/test"

Review Comment:
   Yes to make one serialization/deserialization test happy. One of the tests 
serialize and deserialize all the example DAGs and expect the actual example 
DAGs to equal the serialized -> deserialized one (round trip transformation). 
Having this made the test fail because `file_path` was different. I could have 
mocked it but I also realized that `/tmp/` is something widely use as temporary 
directory in tests so I want with the easy solution :)
   
   Let me know if you want to keep it, I'll have to play with mocks then



##
airflow/example_dags/example_asset_with_watchers.py:
##
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher

Review Comment:
   Makes sense. Done



##
airflow/serialization/serialized_objects.py:
##
@@ -283,7 +305,17 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 """
 dat = var["__type"]
 if dat == DAT.ASSET:
-return Asset(name=var["name"], uri=var["uri"], group=var["group"], 
extra=var["extra"])
+serialized_watchers = var["watchers"] if "watchers" in var else []
+return Asset(
+name=var["name"],
+uri=var["uri"],
+group=var["group"],
+extra=var["extra"],

Review Comment:
   I dont have the answer for this one. What I know is one if at the root level 
of `deserialize` (then deserializing an asset) and the other one is in 
`decode_asset_condition` so deserializing an asset condition. I can create a 
private function to factorize code if you like it better



##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -257,6 +258,17 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
 raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+"""A representation of an asset watcher. The name uniquely identity the 
watch."""
+
+name: str
+# This attribute serves double purpose. For a "normal" asset instance
+# loaded from DAG, this holds the trigger used to monitor an external 
resource.
+# For an asset recreated from a serialized DAG, ho

Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-22 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925543932


##
airflow/example_dags/example_asset_with_watchers.py:
##
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher
 
-file_path = tempfile.NamedTemporaryFile().name
+file_path = "/tmp/test"

Review Comment:
   Did we need to change this?



##
airflow/example_dags/example_asset_with_watchers.py:
##
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher

Review Comment:
   For things in the "public" api, designed directly for use in DAGs we should 
add these to the lazy imports in TaskSDK's init and make this:
   
   ```suggestion
   from airflow.sdk import Asset, AssetWatcher
   ```



##
airflow/serialization/serialized_objects.py:
##
@@ -283,7 +305,17 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 """
 dat = var["__type"]
 if dat == DAT.ASSET:
-return Asset(name=var["name"], uri=var["uri"], group=var["group"], 
extra=var["extra"])
+serialized_watchers = var["watchers"] if "watchers" in var else []
+return Asset(
+name=var["name"],
+uri=var["uri"],
+group=var["group"],
+extra=var["extra"],

Review Comment:
   Consistency please -- There is another deserialize (why are there two? 
Separate question I guess) but 
   ```suggestion
   **var,
   ```



##
airflow/dag_processing/collection.py:
##
@@ -737,16 +735,19 @@ def add_asset_trigger_references(
 # Update references from assets being used
 refs_to_add: dict[tuple[str, str], set[int]] = {}
 refs_to_remove: dict[tuple[str, str], set[int]] = {}
-triggers: dict[int, BaseTrigger] = {}
+triggers: dict[int, dict] = {}
 
 # Optimization: if no asset collected, skip fetching active assets
 active_assets = _find_active_assets(self.assets.keys(), 
session=session) if self.assets else {}
 
 for name_uri, asset in self.assets.items():
 # If the asset belong to a DAG not active or paused, consider 
there is no watcher associated to it
-asset_watchers = asset.watchers if name_uri in active_assets else 
[]
-trigger_hash_to_trigger_dict: dict[int, BaseTrigger] = {
-self._get_base_trigger_hash(trigger): trigger for trigger in 
asset_watchers
+asset_watchers: list[AssetWatcher] = asset.watchers if name_uri in 
active_assets else []
+trigger_hash_to_trigger_dict: dict[int, dict] = {
+self._get_trigger_hash(
+cast(dict, watcher.trigger)["classpath"], cast(dict, 
watcher.trigger)["kwargs"]
+): cast(dict, watcher.trigger)

Review Comment:
   These `cast`s look odd. What's going on here?
   
   At least this looks like it should work?
   ```suggestion
   watcher.trigger["classpath"], watcher.trigger["kwargs"]
   ): cast(dict, watcher.trigger)
   ```



##
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##
@@ -257,6 +258,17 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
 raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+"""A representation of an asset watcher. The name uniquely identity the 
watch."""
+
+name: str
+# This attribute serves double purpose. For a "normal" asset instance
+# loaded from DAG, this holds the trigger used to monitor an external 
resource.
+# For an asset recreated from a serialized DAG, however, this holds the 
serialized data of the trigger.

Review Comment:
   I don't think we should make this class do double duty like this -- We 
already have BaseSerializedOperator etc. and I want to try very hard to avoid 
putting a run-time dependency on TaskSDK code in the core Scheduler.
   
   i.e. I'd like the scheduler to operate only on a class that is deserializedĀ 
from the DB that does not inherit from this class.
   
   (I have yet to do that for SerializedDAG and SerializedBaseOperator, but I 
need to before we can release 3.0)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the sp

Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-21 Thread via GitHub


ashb commented on PR #45562:
URL: https://github.com/apache/airflow/pull/45562#issuecomment-2605063606

   I'll take another look today or tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-21 Thread via GitHub


vincbeck commented on PR #45562:
URL: https://github.com/apache/airflow/pull/45562#issuecomment-2604987283

   @uranusjr @ashb what do you guys think? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-17 Thread via GitHub


vincbeck commented on PR #45562:
URL: https://github.com/apache/airflow/pull/45562#issuecomment-2598489688

   I addressed your feedbacks but I also added something new. It was actually a 
suggestion from @dstandish that I think is a great idea! I introduced the 
concept of `AssetWatcher` that is a thin wrapper around trigger. Explanations:
   
   Instead of having:
   
   ```
   trigger = FileTrigger()
   asset = Asset("", watchers=[trigger])
   ```
   
   Now it is:
   
   ```
   trigger = FileTrigger()
   asset = Asset("", watchers=[AssetWatcher(name="my_queue_watcher", 
trigger=trigger)])
   ```
   
   `AssetWatcher` does not add much besides just a name, but I think it is 
important because, in the future, we might want to surface the relations 
between assets and triggers in the graph view in the UI. Representing a trigger 
in the graph can be tricky because triggers have 2 pieces of information: 
`classpath` and `kwargs`. None of them are very suitable for displaying 
purposes. `classpath` is debatable but it is anyway not representable of the 
trigger. Example: you might have an asset with 2 watchers: each one uses the 
trigger `FileTrigger` to monitor a different file. Having only the classpath 
displayed will not help the user to understand which one is which. Hence the 
name. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-16 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1919214992


##
airflow/triggers/base.py:
##
@@ -124,6 +124,11 @@ def __repr__(self) -> str:
 classpath, kwargs = self.serialize()
 return self.repr(classpath, kwargs)
 
+def __eq__(self, other: Any) -> bool:

Review Comment:
   Well the test no longer fail so I removed it, I am glad you pointed it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-16 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1918886705


##
airflow/triggers/base.py:
##
@@ -124,6 +124,11 @@ def __repr__(self) -> str:
 classpath, kwargs = self.serialize()
 return self.repr(classpath, kwargs)
 
+def __eq__(self, other: Any) -> bool:

Review Comment:
   That's for a test. Without it, the test fails, I dont remember which one 
though. I'll remove it to find out. We can see from there if it should be 
addressed that way



##
airflow/serialization/serialized_objects.py:
##
@@ -297,6 +306,25 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 raise ValueError(f"deserialization not implemented for DAT {dat!r}")
 
 
+def decode_asset(var: dict[str, Any]) -> BaseAsset:
+"""
+Decode a previously serialized asset.
+
+:meta private:
+"""
+
+def _decode_trigger(trigger_infos: dict[str, Any]) -> BaseTrigger:
+return 
import_string(trigger_infos["classpath"])(**trigger_infos["kwargs"])

Review Comment:
   Thanks a lot for the thorough review! You are right, we do not need to 
create the trigger object. I was creating the trigger object to be consistent 
whether the source was coming from the DAG or the DB (serialized DAG). But as 
you pointed out, we can handle the 2 use cases differently and that way no need 
to create the trigger object



##
tests/timetables/test_assets_timetable.py:
##
@@ -142,6 +142,7 @@ def test_serialization(asset_timetable: 
AssetOrTimeSchedule, monkeypatch: Any) -
 "uri": "test://asset/",
 "group": "asset",
 "extra": {},
+"watchers": [],

Review Comment:
   That makes sense! I removed it if there is no watcher



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-16 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1918162136


##
tests/timetables/test_assets_timetable.py:
##
@@ -142,6 +142,7 @@ def test_serialization(asset_timetable: 
AssetOrTimeSchedule, monkeypatch: Any) -
 "uri": "test://asset/",
 "group": "asset",
 "extra": {},
+"watchers": [],

Review Comment:
   Serialized DAG space is "precious" -- if it's empty/default it should not 
appear in the serialized JSON please. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-15 Thread via GitHub


uranusjr commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1917646149


##
airflow/serialization/serialized_objects.py:
##
@@ -297,6 +306,25 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 raise ValueError(f"deserialization not implemented for DAT {dat!r}")
 
 
+def decode_asset(var: dict[str, Any]) -> BaseAsset:
+"""
+Decode a previously serialized asset.
+
+:meta private:
+"""
+
+def _decode_trigger(trigger_infos: dict[str, Any]) -> BaseTrigger:
+return 
import_string(trigger_infos["classpath"])(**trigger_infos["kwargs"])

Review Comment:
   We should check if we are importing from a known location. This is how 
timetables are decoded; it must be defined under the `airflow` namespace, or 
from a plugin-registered location. be7efb1d30929a7f742f5b7735a3d6fbadadd352
   
   With that said, I donā€™t think we actually need the full trigger objects here 
in the first place? The decoded trigger here simply gets serialised again to be 
stored in the database anyway.
   
   
https://github.com/apache/airflow/blob/93441c9adb3ca7be86d9dd8943a968ed93cc015f/airflow/dag_processing/collection.py#L748-L808
   
   Itā€™s likely better to simply have a serialised trigger representation here 
instead, similar to how MappedOperator gets deserialised. MappedOperator does 
not import the underlying operator class when deserialised, but only store the 
needed values in a dict, and only actually fully saturate the class when in the 
worker.
   
   
https://github.com/apache/airflow/blob/93441c9adb3ca7be86d9dd8943a968ed93cc015f/airflow/models/mappedoperator.py#L257-L262



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-15 Thread via GitHub


uranusjr commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1917649765


##
airflow/triggers/base.py:
##
@@ -124,6 +124,11 @@ def __repr__(self) -> str:
 classpath, kwargs = self.serialize()
 return self.repr(classpath, kwargs)
 
+def __eq__(self, other: Any) -> bool:

Review Comment:
   Where do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-15 Thread via GitHub


vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1917175713


##
airflow/serialization/serialized_objects.py:
##
@@ -297,6 +306,25 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 raise ValueError(f"deserialization not implemented for DAT {dat!r}")
 
 
+def decode_asset(var: dict[str, Any]) -> BaseAsset:
+"""
+Decode a previously serialized asset.
+
+:meta private:
+"""
+
+def _decode_trigger(trigger_infos: dict[str, Any]) -> BaseTrigger:
+return 
import_string(trigger_infos["classpath"])(**trigger_infos["kwargs"])

Review Comment:
   As far as I understand, this is how triggers are designed. The triggerer 
works the same way: it retrieves the trigger classpath from the database and 
imports it. How would you suggest doing it differently?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-15 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1917136175


##
airflow/triggers/base.py:
##
@@ -124,6 +124,11 @@ def __repr__(self) -> str:
 classpath, kwargs = self.serialize()
 return self.repr(classpath, kwargs)
 
+def __eq__(self, other: Any) -> bool:
+if not isinstance(other, BaseTrigger):
+return False

Review Comment:
   ```suggestion
   if not isinstance(other, BaseTrigger):
   return NotImplemented
   ```
   It gives the `other` a chance to have it's `__eq__` method called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-82 Handle trigger serialization [airflow]

2025-01-15 Thread via GitHub


ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1917133923


##
airflow/serialization/serialized_objects.py:
##
@@ -297,6 +306,25 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
 raise ValueError(f"deserialization not implemented for DAT {dat!r}")
 
 
+def decode_asset(var: dict[str, Any]) -> BaseAsset:
+"""
+Decode a previously serialized asset.
+
+:meta private:
+"""
+
+def _decode_trigger(trigger_infos: dict[str, Any]) -> BaseTrigger:
+return 
import_string(trigger_infos["classpath"])(**trigger_infos["kwargs"])

Review Comment:
   I don't like that this will blindly import the string it's given, and 
instantiate the class. This feels like a massive security incident waiting to 
happen. Serialized DAG is not to be trusted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org