This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d7c3e7d9c7 Add asset event emission listener event (#61718)
9d7c3e7d9c7 is described below

commit 9d7c3e7d9c7365c10f3267b660ff6e01f9b1a240
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Feb 11 12:46:16 2026 +0800

    Add asset event emission listener event (#61718)
---
 airflow-core/src/airflow/assets/manager.py         | 32 +++++++++++---
 airflow-core/src/airflow/listeners/spec/asset.py   | 11 +++++
 .../airflow/listeners/{spec/asset.py => types.py}  | 29 ++++++-------
 .../tests/unit/listeners/asset_listener.py         | 11 ++++-
 .../tests/unit/listeners/test_asset_listener.py    | 50 ++++++++++++++++------
 5 files changed, 96 insertions(+), 37 deletions(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index cfb02b096b0..b7b0ce5c239 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -28,6 +28,7 @@ from sqlalchemy.orm import joinedload
 from airflow._shared.observability.metrics.stats import Stats
 from airflow.configuration import conf
 from airflow.listeners.listener import get_listener_manager
+from airflow.listeners.types import AssetEvent as ListenerAssetEvent
 from airflow.models.asset import (
     AssetAliasModel,
     AssetDagRunQueue,
@@ -234,7 +235,7 @@ class AssetManager(LoggingMixin):
 
         dags_to_queue_from_asset_alias = set()
         if source_alias_names:
-            asset_alias_models = session.scalars(
+            asset_alias_models: Iterable[AssetAliasModel] = session.scalars(
                 select(AssetAliasModel)
                 .where(AssetAliasModel.name.in_(source_alias_names))
                 .options(
@@ -251,6 +252,8 @@ class AssetManager(LoggingMixin):
                     for alias_ref in asset_alias_model.scheduled_dags
                     if not alias_ref.dag.is_paused
                 }
+        else:
+            asset_alias_models = []
 
         dags_to_queue_from_asset_ref = set(
             session.scalars(
@@ -267,7 +270,20 @@ class AssetManager(LoggingMixin):
             )
         )
 
-        cls.notify_asset_changed(asset=asset_model.to_serialized())
+        asset = asset_model.to_serialized()
+        cls.notify_asset_changed(asset=asset)
+        cls.nofity_asset_event_emitted(
+            asset_event=ListenerAssetEvent(
+                asset=asset,
+                extra=asset_event.extra,
+                source_dag_id=asset_event.source_dag_id,
+                source_task_id=asset_event.source_task_id,
+                source_run_id=asset_event.source_run_id,
+                source_map_index=asset_event.source_map_index,
+                source_aliases=[aam.to_serialized() for aam in 
asset_alias_models],
+                partition_key=partition_key,
+            )
+        )
 
         Stats.incr("asset.updates")
 
@@ -304,14 +320,18 @@ class AssetManager(LoggingMixin):
     def notify_asset_changed(asset: SerializedAsset) -> None:
         """Run applicable notification actions when an asset is changed."""
         try:
-            # TODO: AIP-76 this will have to change. needs to know *what* 
happened to the asset (e.g. partition key)
-            #  maybe we should just add the event to the signature
-            #  or add a new hook `on_asset_event`
-            #  https://github.com/apache/airflow/issues/58290
             get_listener_manager().hook.on_asset_changed(asset=asset)
         except Exception:
             log.exception("error calling listener")
 
+    @staticmethod
+    def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None:
+        """Run applicable notification actions when an asset event is 
emitted."""
+        try:
+            
get_listener_manager().hook.on_asset_event_emitted(asset_event=asset_event)
+        except Exception:
+            log.exception("error calling listener")
+
     @classmethod
     def _queue_dagruns(
         cls,
diff --git a/airflow-core/src/airflow/listeners/spec/asset.py 
b/airflow-core/src/airflow/listeners/spec/asset.py
index 05ba0809bcd..ad53e08629f 100644
--- a/airflow-core/src/airflow/listeners/spec/asset.py
+++ b/airflow-core/src/airflow/listeners/spec/asset.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
 from pluggy import HookspecMarker
 
 if TYPE_CHECKING:
+    from airflow.listeners.types import AssetEvent
     from airflow.serialization.definitions.assets import SerializedAsset, 
SerializedAssetAlias
 
 hookspec = HookspecMarker("airflow")
@@ -41,3 +42,13 @@ def on_asset_alias_created(asset_alias: 
SerializedAssetAlias):
 @hookspec
 def on_asset_changed(asset: SerializedAsset):
     """Execute when asset change is registered."""
+
+
+@hookspec
+def on_asset_event_emitted(asset_event: AssetEvent):
+    """
+    Execute when an asset event is emitted.
+
+    This is generally called together with ``on_asset_changed``, but with
+    information on the emitted event instead.
+    """
diff --git a/airflow-core/src/airflow/listeners/spec/asset.py 
b/airflow-core/src/airflow/listeners/types.py
similarity index 70%
copy from airflow-core/src/airflow/listeners/spec/asset.py
copy to airflow-core/src/airflow/listeners/types.py
index 05ba0809bcd..120b8ef503a 100644
--- a/airflow-core/src/airflow/listeners/spec/asset.py
+++ b/airflow-core/src/airflow/listeners/types.py
@@ -20,24 +20,23 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from pluggy import HookspecMarker
+import attrs
 
 if TYPE_CHECKING:
-    from airflow.serialization.definitions.assets import SerializedAsset, 
SerializedAssetAlias
-
-hookspec = HookspecMarker("airflow")
-
-
-@hookspec
-def on_asset_created(asset: SerializedAsset):
-    """Execute when a new asset is created."""
+    from pydantic import JsonValue
 
+    from airflow.serialization.definitions.assets import SerializedAsset, 
SerializedAssetAlias
 
-@hookspec
-def on_asset_alias_created(asset_alias: SerializedAssetAlias):
-    """Execute when a new asset alias is created."""
 
[email protected]
+class AssetEvent:
+    """Asset event representation for asset listener hooks."""
 
-@hookspec
-def on_asset_changed(asset: SerializedAsset):
-    """Execute when asset change is registered."""
+    asset: SerializedAsset
+    extra: dict[str, JsonValue]
+    source_dag_id: str | None
+    source_task_id: str | None
+    source_run_id: str | None
+    source_map_index: int | None
+    source_aliases: list[SerializedAssetAlias]
+    partition_key: str | None
diff --git a/airflow-core/tests/unit/listeners/asset_listener.py 
b/airflow-core/tests/unit/listeners/asset_listener.py
index e03b34a773a..bdd5551018c 100644
--- a/airflow-core/tests/unit/listeners/asset_listener.py
+++ b/airflow-core/tests/unit/listeners/asset_listener.py
@@ -23,6 +23,7 @@ from airflow.listeners import hookimpl
 
 changed = []
 created = []
+emitted = []
 
 
 @hookimpl
@@ -30,11 +31,17 @@ def on_asset_changed(asset):
     changed.append(copy.deepcopy(asset))
 
 
+@hookimpl
+def on_asset_event_emitted(asset_event):
+    emitted.append(copy.deepcopy(asset_event))
+
+
 @hookimpl
 def on_asset_created(asset):
     created.append(copy.deepcopy(asset))
 
 
 def clear():
-    global changed, created
-    changed, created = [], []
+    changed.clear()
+    created.clear()
+    emitted.clear()
diff --git a/airflow-core/tests/unit/listeners/test_asset_listener.py 
b/airflow-core/tests/unit/listeners/test_asset_listener.py
index b2ce78c2443..0de2588d331 100644
--- a/airflow-core/tests/unit/listeners/test_asset_listener.py
+++ b/airflow-core/tests/unit/listeners/test_asset_listener.py
@@ -14,15 +14,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from __future__ import annotations
 
 import pytest
 
+from airflow.listeners.types import AssetEvent
 from airflow.models.asset import AssetModel
 from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk.definitions.asset import Asset
-from airflow.utils.session import provide_session
+from airflow.sdk import Asset
+from airflow.serialization.encoders import ensure_serialized_asset
 
+from tests_common.test_utils.db import clear_db_assets
 from unit.listeners import asset_listener
 
 
@@ -33,31 +36,50 @@ def clean_listener_state():
     asset_listener.clear()
 
 
[email protected]_test
-@provide_session
-def test_asset_listener_on_asset_changed_gets_calls(
-    create_task_instance_of_operator, session, listener_manager
-):
-    listener_manager(asset_listener)
[email protected]
+def asset(session):
     asset_uri = "test://asset/"
     asset_name = "test_asset_uri"
     asset_group = "test-group"
     asset = Asset(uri=asset_uri, name=asset_name, group=asset_group)
     asset_model = AssetModel(uri=asset_uri, name=asset_name, group=asset_group)
     session.add(asset_model)
-
     session.flush()
+    yield asset
+    clear_db_assets()
 
-    ti = create_task_instance_of_operator(
+
[email protected]
+def ti(create_task_instance_of_operator, asset, session):
+    return create_task_instance_of_operator(
         operator_class=EmptyOperator,
         dag_id="producing_dag",
         task_id="test_task",
         session=session,
         outlets=[asset],
     )
+
+
[email protected]_test
+def test_asset_listener_on_asset_changed(asset, ti, listener_manager):
+    listener_manager(asset_listener)
     ti.run()
+    assert asset_listener.changed == [ensure_serialized_asset(asset)]
+
 
-    assert len(asset_listener.changed) == 1
-    assert asset_listener.changed[0].uri == asset_uri
-    assert asset_listener.changed[0].name == asset_name
-    assert asset_listener.changed[0].group == asset_group
[email protected]_test
+def test_asset_listener_on_asset_event_emitted(asset, ti, listener_manager):
+    listener_manager(asset_listener)
+    ti.run()
+    assert asset_listener.emitted == [
+        AssetEvent(
+            asset=ensure_serialized_asset(asset),
+            extra={},
+            source_dag_id=ti.dag_id,
+            source_task_id=ti.task_id,
+            source_run_id=ti.run_id,
+            source_map_index=ti.map_index,
+            source_aliases=[],
+            partition_key=None,
+        )
+    ]

Reply via email to