This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 49ef2f890b8 Avoid N+1 queries when bulk deleting variables (#68508)
49ef2f890b8 is described below

commit 49ef2f890b8e2a5c9323f82ef770441943f2816b
Author: Yuseok Jo <[email protected]>
AuthorDate: Mon Jun 15 00:45:24 2026 +0900

    Avoid N+1 queries when bulk deleting variables (#68508)
---
 .../core_api/services/public/variables.py          | 19 ++++++++--------
 .../core_api/routes/public/test_variables.py       | 26 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
index e363565bb92..95d52bfd805 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
@@ -100,17 +100,18 @@ def update_orm_from_pydantic(
 class BulkVariableService(BulkService[VariableBody]):
     """Service for handling bulk operations on variables."""
 
-    def categorize_keys(self, keys: set) -> tuple[set, set]:
+    def categorize_keys(self, keys: set) -> tuple[dict, set, set]:
         """Categorize the given keys into matched_keys and not_found_keys 
based on existing keys."""
-        existing_keys = {variable for variable in 
self.session.execute(select(Variable.key)).scalars()}
-        matched_keys = existing_keys & keys
-        not_found_keys = keys - existing_keys
-        return matched_keys, not_found_keys
+        existing_variables = 
self.session.execute(select(Variable).filter(Variable.key.in_(keys))).scalars()
+        existing_variables_dict = {variable.key: variable for variable in 
existing_variables}
+        matched_keys = set(existing_variables_dict.keys())
+        not_found_keys = keys - matched_keys
+        return existing_variables_dict, matched_keys, not_found_keys
 
     def handle_bulk_create(self, action: BulkCreateAction, results: 
BulkActionResponse) -> None:
         """Bulk create variables."""
         to_create_keys = {variable.key for variable in action.entities}
-        matched_keys, not_found_keys = self.categorize_keys(to_create_keys)
+        _, matched_keys, not_found_keys = self.categorize_keys(to_create_keys)
 
         try:
             if action.action_on_existence == BulkActionOnExistence.FAIL and 
matched_keys:
@@ -141,7 +142,7 @@ class BulkVariableService(BulkService[VariableBody]):
     def handle_bulk_update(self, action: BulkUpdateAction, results: 
BulkActionResponse) -> None:
         """Bulk Update variables."""
         to_update_keys = {variable.key for variable in action.entities}
-        matched_keys, not_found_keys = self.categorize_keys(to_update_keys)
+        _, matched_keys, not_found_keys = self.categorize_keys(to_update_keys)
         try:
             if action.action_on_non_existence == BulkActionNotOnExistence.FAIL 
and not_found_keys:
                 raise HTTPException(
@@ -171,7 +172,7 @@ class BulkVariableService(BulkService[VariableBody]):
     def handle_bulk_delete(self, action: BulkDeleteAction, results: 
BulkActionResponse) -> None:
         """Bulk delete variables."""
         to_delete_keys = set(action.entities)
-        matched_keys, not_found_keys = self.categorize_keys(to_delete_keys)
+        existing_variables_dict, matched_keys, not_found_keys = 
self.categorize_keys(to_delete_keys)
 
         try:
             if action.action_on_non_existence == BulkActionNotOnExistence.FAIL 
and not_found_keys:
@@ -185,7 +186,7 @@ class BulkVariableService(BulkService[VariableBody]):
                 delete_keys = to_delete_keys
 
             for key in delete_keys:
-                existing_variable = 
self.session.scalar(select(Variable).where(Variable.key == key).limit(1))
+                existing_variable = existing_variables_dict.get(key)
                 if existing_variable:
                     self.session.delete(existing_variable)
                     results.success.append(key)
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
index 51ec35ac493..ed480b03602 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
@@ -25,6 +25,9 @@ import pytest
 from sqlalchemy import select
 from sqlalchemy.orm import Session
 
+from airflow.api_fastapi.core_api.datamodels.common import BulkBody
+from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
+from airflow.api_fastapi.core_api.services.public.variables import 
BulkVariableService
 from airflow.models.team import Team
 from airflow.models.variable import Variable
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -1503,3 +1506,26 @@ class TestBulkVariables(TestVariableEndpoint):
 
         expected_error_keys = {err["input"]["key"] for err in detail}
         assert sorted(expected_error_keys) == ["var_2", "var_3"]
+
+    @pytest.mark.parametrize("num_variables", [1, 25])
+    def test_bulk_delete_resolves_existence_in_single_query(self, session, 
num_variables):
+        """Bulk delete looks up all targeted variables in one query, not one 
per key (no N+1)."""
+        keys = [f"bulk_delete_var_{i}" for i in range(num_variables)]
+        for key in keys:
+            Variable.set(key=key, value="value", session=session)
+        session.commit()
+
+        request = BulkBody[VariableBody].model_validate(
+            {"actions": [{"action": "delete", "entities": keys, 
"action_on_non_existence": "skip"}]}
+        )
+        service = BulkVariableService(session=session, request=request)
+
+        # Only the single existence-lookup SELECT runs here; deletes flush 
later on commit.
+        with assert_queries_count(1):
+            response = service.handle_request()
+
+        assert response.delete is not None
+        assert sorted(response.delete.success) == sorted(keys)
+
+        session.commit()
+        assert 
session.scalars(select(Variable.key).where(Variable.key.in_(keys))).all() == []

Reply via email to