This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 49ef2f890b8 Avoid N+1 queries when bulk deleting variables (#68508)
49ef2f890b8 is described below
commit 49ef2f890b8e2a5c9323f82ef770441943f2816b
Author: Yuseok Jo <[email protected]>
AuthorDate: Mon Jun 15 00:45:24 2026 +0900
Avoid N+1 queries when bulk deleting variables (#68508)
---
.../core_api/services/public/variables.py | 19 ++++++++--------
.../core_api/routes/public/test_variables.py | 26 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 9 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
index e363565bb92..95d52bfd805 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py
@@ -100,17 +100,18 @@ def update_orm_from_pydantic(
class BulkVariableService(BulkService[VariableBody]):
"""Service for handling bulk operations on variables."""
- def categorize_keys(self, keys: set) -> tuple[set, set]:
+ def categorize_keys(self, keys: set) -> tuple[dict, set, set]:
"""Categorize the given keys into matched_keys and not_found_keys
based on existing keys."""
- existing_keys = {variable for variable in
self.session.execute(select(Variable.key)).scalars()}
- matched_keys = existing_keys & keys
- not_found_keys = keys - existing_keys
- return matched_keys, not_found_keys
+ existing_variables =
self.session.execute(select(Variable).filter(Variable.key.in_(keys))).scalars()
+ existing_variables_dict = {variable.key: variable for variable in
existing_variables}
+ matched_keys = set(existing_variables_dict.keys())
+ not_found_keys = keys - matched_keys
+ return existing_variables_dict, matched_keys, not_found_keys
def handle_bulk_create(self, action: BulkCreateAction, results:
BulkActionResponse) -> None:
"""Bulk create variables."""
to_create_keys = {variable.key for variable in action.entities}
- matched_keys, not_found_keys = self.categorize_keys(to_create_keys)
+ _, matched_keys, not_found_keys = self.categorize_keys(to_create_keys)
try:
if action.action_on_existence == BulkActionOnExistence.FAIL and
matched_keys:
@@ -141,7 +142,7 @@ class BulkVariableService(BulkService[VariableBody]):
def handle_bulk_update(self, action: BulkUpdateAction, results:
BulkActionResponse) -> None:
"""Bulk Update variables."""
to_update_keys = {variable.key for variable in action.entities}
- matched_keys, not_found_keys = self.categorize_keys(to_update_keys)
+ _, matched_keys, not_found_keys = self.categorize_keys(to_update_keys)
try:
if action.action_on_non_existence == BulkActionNotOnExistence.FAIL
and not_found_keys:
raise HTTPException(
@@ -171,7 +172,7 @@ class BulkVariableService(BulkService[VariableBody]):
def handle_bulk_delete(self, action: BulkDeleteAction, results:
BulkActionResponse) -> None:
"""Bulk delete variables."""
to_delete_keys = set(action.entities)
- matched_keys, not_found_keys = self.categorize_keys(to_delete_keys)
+ existing_variables_dict, matched_keys, not_found_keys =
self.categorize_keys(to_delete_keys)
try:
if action.action_on_non_existence == BulkActionNotOnExistence.FAIL
and not_found_keys:
@@ -185,7 +186,7 @@ class BulkVariableService(BulkService[VariableBody]):
delete_keys = to_delete_keys
for key in delete_keys:
- existing_variable =
self.session.scalar(select(Variable).where(Variable.key == key).limit(1))
+ existing_variable = existing_variables_dict.get(key)
if existing_variable:
self.session.delete(existing_variable)
results.success.append(key)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
index 51ec35ac493..ed480b03602 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py
@@ -25,6 +25,9 @@ import pytest
from sqlalchemy import select
from sqlalchemy.orm import Session
+from airflow.api_fastapi.core_api.datamodels.common import BulkBody
+from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
+from airflow.api_fastapi.core_api.services.public.variables import
BulkVariableService
from airflow.models.team import Team
from airflow.models.variable import Variable
from airflow.utils.session import NEW_SESSION, provide_session
@@ -1503,3 +1506,26 @@ class TestBulkVariables(TestVariableEndpoint):
expected_error_keys = {err["input"]["key"] for err in detail}
assert sorted(expected_error_keys) == ["var_2", "var_3"]
+
+ @pytest.mark.parametrize("num_variables", [1, 25])
+ def test_bulk_delete_resolves_existence_in_single_query(self, session,
num_variables):
+ """Bulk delete looks up all targeted variables in one query, not one
per key (no N+1)."""
+ keys = [f"bulk_delete_var_{i}" for i in range(num_variables)]
+ for key in keys:
+ Variable.set(key=key, value="value", session=session)
+ session.commit()
+
+ request = BulkBody[VariableBody].model_validate(
+ {"actions": [{"action": "delete", "entities": keys,
"action_on_non_existence": "skip"}]}
+ )
+ service = BulkVariableService(session=session, request=request)
+
+ # Only the single existence-lookup SELECT runs here; deletes flush
later on commit.
+ with assert_queries_count(1):
+ response = service.handle_request()
+
+ assert response.delete is not None
+ assert sorted(response.delete.success) == sorted(keys)
+
+ session.commit()
+ assert
session.scalars(select(Variable.key).where(Variable.key.in_(keys))).all() == []