ashb commented on code in PR #45562:
URL: https://github.com/apache/airflow/pull/45562#discussion_r1925543932


##########
airflow/example_dags/example_asset_with_watchers.py:
##########
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher
 
-file_path = tempfile.NamedTemporaryFile().name
+file_path = "/tmp/test"

Review Comment:
   Did we need to change this?



##########
airflow/example_dags/example_asset_with_watchers.py:
##########
@@ -21,15 +21,14 @@
 from __future__ import annotations
 
 import os
-import tempfile
 
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
 from airflow.providers.standard.triggers.file import FileTrigger
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset import Asset, AssetWatcher

Review Comment:
   For things in the "public" api, designed directly for use in DAGs we should 
add these to the lazy imports in TaskSDK's init and make this:
   
   ```suggestion
   from airflow.sdk import Asset, AssetWatcher
   ```



##########
airflow/serialization/serialized_objects.py:
##########
@@ -283,7 +305,17 @@ def decode_asset_condition(var: dict[str, Any]) -> 
BaseAsset:
     """
     dat = var["__type"]
     if dat == DAT.ASSET:
-        return Asset(name=var["name"], uri=var["uri"], group=var["group"], 
extra=var["extra"])
+        serialized_watchers = var["watchers"] if "watchers" in var else []
+        return Asset(
+            name=var["name"],
+            uri=var["uri"],
+            group=var["group"],
+            extra=var["extra"],

Review Comment:
   Consistency please -- There is another deserialize (why are there two? 
Separate question I guess) but 
   ```suggestion
               **var,
   ```



##########
airflow/dag_processing/collection.py:
##########
@@ -737,16 +735,19 @@ def add_asset_trigger_references(
         # Update references from assets being used
         refs_to_add: dict[tuple[str, str], set[int]] = {}
         refs_to_remove: dict[tuple[str, str], set[int]] = {}
-        triggers: dict[int, BaseTrigger] = {}
+        triggers: dict[int, dict] = {}
 
         # Optimization: if no asset collected, skip fetching active assets
         active_assets = _find_active_assets(self.assets.keys(), 
session=session) if self.assets else {}
 
         for name_uri, asset in self.assets.items():
             # If the asset belong to a DAG not active or paused, consider 
there is no watcher associated to it
-            asset_watchers = asset.watchers if name_uri in active_assets else 
[]
-            trigger_hash_to_trigger_dict: dict[int, BaseTrigger] = {
-                self._get_base_trigger_hash(trigger): trigger for trigger in 
asset_watchers
+            asset_watchers: list[AssetWatcher] = asset.watchers if name_uri in 
active_assets else []
+            trigger_hash_to_trigger_dict: dict[int, dict] = {
+                self._get_trigger_hash(
+                    cast(dict, watcher.trigger)["classpath"], cast(dict, 
watcher.trigger)["kwargs"]
+                ): cast(dict, watcher.trigger)

Review Comment:
   These `cast`s look odd. What's going on here?
   
   At least this looks like it should work?
   ```suggestion
                       watcher.trigger["classpath"], watcher.trigger["kwargs"]
                   ): cast(dict, watcher.trigger)
   ```



##########
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##########
@@ -257,6 +258,17 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
         raise NotImplementedError
 
 
+@attrs.define(frozen=True)
+class AssetWatcher:
+    """A representation of an asset watcher. The name uniquely identity the 
watch."""
+
+    name: str
+    # This attribute serves double purpose. For a "normal" asset instance
+    # loaded from DAG, this holds the trigger used to monitor an external 
resource.
+    # For an asset recreated from a serialized DAG, however, this holds the 
serialized data of the trigger.

Review Comment:
   I don't think we should make this class do double duty like this -- We 
already have BaseSerializedOperator etc. and I want to try very hard to avoid 
putting a run-time dependency on TaskSDK code in the core Scheduler.
   
   i.e. I'd like the scheduler to operate only on a class that is deserialized 
from the DB that does not inherit from this class.
   
   (I have yet to do that for SerializedDAG and SerializedBaseOperator, but I 
need to before we can release 3.0)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to