vincbeck commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925654427


##########
airflow/dag_processing/collection.py:
##########
@@ -737,16 +735,19 @@ def add_asset_trigger_references(
         # Update references from assets being used
         refs_to_add: dict[tuple[str, str], set[int]] = {}
         refs_to_remove: dict[tuple[str, str], set[int]] = {}
-        triggers: dict[int, BaseTrigger] = {}
+        triggers: dict[int, dict] = {}
 
         # Optimization: if no asset collected, skip fetching active assets
         active_assets = _find_active_assets(self.assets.keys(), 
session=session) if self.assets else {}
 
         for name_uri, asset in self.assets.items():
             # If the asset belong to a DAG not active or paused, consider 
there is no watcher associated to it
-            asset_watchers = asset.watchers if name_uri in active_assets else 
[]
-            trigger_hash_to_trigger_dict: dict[int, BaseTrigger] = {
-                self._get_base_trigger_hash(trigger): trigger for trigger in 
asset_watchers
+            asset_watchers: list[AssetWatcher] = asset.watchers if name_uri in 
active_assets else []
+            trigger_hash_to_trigger_dict: dict[int, dict] = {
+                self._get_trigger_hash(
+                    cast(dict, watcher.trigger)["classpath"], cast(dict, 
watcher.trigger)["kwargs"]
+                ): cast(dict, watcher.trigger)

Review Comment:
   The `trigger` property in `AssetWatcher` can be a `BaseTrigger` or a `dict` 
depending on where the object AssetWatcher is created:
   - In the DAG definition (by users), `trigger` is a `BaseTrigger`
   - When deserializing the DAG from the DB, `trigger` is a `dict` to avoid 
creating the `AssetWatcher` as part of the deserialization process (based on 
your comment and suggestion by TP 
[here](https://github.com/apache/airflow/pull/45562#discussion_r1917646149))
   
   Here I cast it to make Mypy happy



##########
airflow/example_dags/example_asset_with_watchers.py:
##########
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher
 
-file_path = tempfile.NamedTemporaryFile().name
+file_path = "/tmp/test"

Review Comment:
   Yes to make one serialization/deserialization test happy. One of the tests 
serialize and deserialize all the example DAGs and expect the actual example 
DAGs to equal the serialized -> deserialized one (round trip transformation). 
Having this made the test fail because `file_path` was different. I could have 
mocked it but I also realized that `/tmp/` is something widely use as temporary 
directory in tests so I want with the easy solution :)
   
   Let me know if you want to keep it, I'll have to play with mocks then



##########
airflow/example_dags/example_asset_with_watchers.py:
##########
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher

Review Comment:
   Makes sense. Done



##########
airflow/serialization/serialized_objects.py:
##########
@@ -283,7 +305,17 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
     """
     dat = var["__type"]
     if dat == DAT.ASSET:
-        return Asset(name=var["name"], uri=var["uri"], group=var["group"], 
extra=var["extra"])
+        serialized_watchers = var["watchers"] if "watchers" in var else []
+        return Asset(
+            name=var["name"],
+            uri=var["uri"],
+            group=var["group"],
+            extra=var["extra"],

Review Comment:
   I dont have the answer for this one. What I know is one if at the root level 
of `deserialize` (then deserializing an asset) and the other one is in 
`decode_asset_condition` so deserializing an asset condition. I can create a 
private function to factorize code if you like it better



##########
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##########
@@ -257,6 +258,17 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
         raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+    """A representation of an asset watcher. The name uniquely identity the 
watch."""
+
+    name: str
+    # This attribute serves double purpose. For a "normal" asset instance
+    # loaded from DAG, this holds the trigger used to monitor an external 
resource.
+    # For an asset recreated from a serialized DAG, however, this holds the 
serialized data of the trigger.

Review Comment:
   Are suggesting we should not use this class when we deserialize the DAG? 
Currently this class is used by users when defining their DAGs and by the 
scheduler when it deserializes the DAG from the DB. Do you want to use it only 
by users? I am trying to understand the path forward :)
   
   Do not we have the same problem with classes like `Asset`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to