This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 20bb220423d Fix partitioned asset events incorrectly triggering 
non-partition-aware Dags (#63848)
20bb220423d is described below

commit 20bb220423d768273147fcf8faa48866f275ce44
Author: Shubham Gondane <[email protected]>
AuthorDate: Tue Mar 24 03:55:12 2026 -0700

    Fix partitioned asset events incorrectly triggering non-partition-aware 
Dags (#63848)
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow-core/src/airflow/assets/manager.py     |  2 +-
 airflow-core/tests/unit/assets/test_manager.py | 23 +++++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index a599e79018e..dca3db9b181 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -357,7 +357,7 @@ class AssetManager(LoggingMixin):
         )
 
         non_partitioned_dags = dags_to_queue.difference(partition_dags)  # 
don't double process
-        if not non_partitioned_dags:
+        if not non_partitioned_dags or partition_key is not None:
             return None
 
         # Possible race condition: if multiple dags or multiple (usually
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index 82477042d86..9ab50bcb3e5 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -294,3 +294,26 @@ class TestAssetManager:
 
         queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id))
         assert queued_id == "stale_dag"
+
+    @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+    def 
test_partitioned_asset_event_does_not_trigger_non_partitioned_dag(self, 
session, mock_task_instance):
+        """partitioned asset events (events with partition key) must not queue 
non-partition-aware Dags."""
+        asm = AssetModel(uri="test://asset/", name="test_asset", group="asset")
+        session.add(asm)
+        dag = DagModel(
+            dag_id="consumer_dag", is_paused=False, bundle_name="testing", 
timetable_partitioned=False
+        )
+        session.add(dag)
+        asm.scheduled_dags = [DagScheduleAssetReference(dag_id=dag.dag_id)]
+        session.execute(delete(AssetDagRunQueue))
+        session.flush()
+
+        AssetManager.register_asset_change(
+            task_instance=mock_task_instance,
+            asset=Asset(uri="test://asset/", name="test_asset"),
+            session=session,
+            partition_key="2024-01-01T00:00:00+00:00",
+        )
+        session.flush()
+
+        assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 0

Reply via email to