This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 20bb220423d Fix partitioned asset events incorrectly triggering
non-partition-aware Dags (#63848)
20bb220423d is described below
commit 20bb220423d768273147fcf8faa48866f275ce44
Author: Shubham Gondane <[email protected]>
AuthorDate: Tue Mar 24 03:55:12 2026 -0700
Fix partitioned asset events incorrectly triggering non-partition-aware
Dags (#63848)
Co-authored-by: Wei Lee <[email protected]>
---
airflow-core/src/airflow/assets/manager.py | 2 +-
airflow-core/tests/unit/assets/test_manager.py | 23 +++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index a599e79018e..dca3db9b181 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -357,7 +357,7 @@ class AssetManager(LoggingMixin):
)
non_partitioned_dags = dags_to_queue.difference(partition_dags) #
don't double process
- if not non_partitioned_dags:
+ if not non_partitioned_dags or partition_key is not None:
return None
# Possible race condition: if multiple dags or multiple (usually
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index 82477042d86..9ab50bcb3e5 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -294,3 +294,26 @@ class TestAssetManager:
queued_id = session.scalar(select(AssetDagRunQueue.target_dag_id))
assert queued_id == "stale_dag"
+
+ @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+ def
test_partitioned_asset_event_does_not_trigger_non_partitioned_dag(self,
session, mock_task_instance):
+ """partitioned asset events (events with partition key) must not queue
non-partition-aware Dags."""
+ asm = AssetModel(uri="test://asset/", name="test_asset", group="asset")
+ session.add(asm)
+ dag = DagModel(
+ dag_id="consumer_dag", is_paused=False, bundle_name="testing",
timetable_partitioned=False
+ )
+ session.add(dag)
+ asm.scheduled_dags = [DagScheduleAssetReference(dag_id=dag.dag_id)]
+ session.execute(delete(AssetDagRunQueue))
+ session.flush()
+
+ AssetManager.register_asset_change(
+ task_instance=mock_task_instance,
+ asset=Asset(uri="test://asset/", name="test_asset"),
+ session=session,
+ partition_key="2024-01-01T00:00:00+00:00",
+ )
+ session.flush()
+
+ assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 0