This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 44900b84a9c fix: Replace expunge_all with expunge in MetastoreBackend
(#63080)
44900b84a9c is described below
commit 44900b84a9c9fcdbb979e91543a51d777f15e34a
Author: Shivam Rastogi <[email protected]>
AuthorDate: Tue Mar 10 09:15:09 2026 -0700
fix: Replace expunge_all with expunge in MetastoreBackend (#63080)
MetastoreBackend.get_connection() and get_variable() called
session.expunge_all() to detach the returned object from the session.
This removed all objects from the shared scoped session, including
unrelated pending objects added by other code sharing the same
thread-local session.
This caused team-scoped DAG bundles to silently fail to persist when
sync_bundles_to_db triggered a connection lookup through S3DagBundle's
view_url_template, which initializes an S3Hook and calls get_connection.
Replace expunge_all() with expunge(obj) to only detach the specific
queried Connection or Variable, leaving all other session state intact.
closes: #62244
---
airflow-core/src/airflow/secrets/metastore.py | 5 +-
.../tests/unit/always/test_secrets_metastore.py | 110 +++++++++++++++++++++
2 files changed, 113 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/secrets/metastore.py
b/airflow-core/src/airflow/secrets/metastore.py
index bde8423158d..e7f25a9f917 100644
--- a/airflow-core/src/airflow/secrets/metastore.py
+++ b/airflow-core/src/airflow/secrets/metastore.py
@@ -57,7 +57,8 @@ class MetastoreBackend(BaseSecretsBackend):
)
.limit(1)
)
- session.expunge_all()
+ if conn:
+ session.expunge(conn)
return conn
@provide_session
@@ -79,7 +80,7 @@ class MetastoreBackend(BaseSecretsBackend):
.where(Variable.key == key, or_(Variable.team_name == team_name,
Variable.team_name.is_(None)))
.limit(1)
)
- session.expunge_all()
if var_value:
+ session.expunge(var_value)
return var_value.val
return None
diff --git a/airflow-core/tests/unit/always/test_secrets_metastore.py
b/airflow-core/tests/unit/always/test_secrets_metastore.py
new file mode 100644
index 00000000000..d13419563f7
--- /dev/null
+++ b/airflow-core/tests/unit/always/test_secrets_metastore.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.models.connection import Connection
+from airflow.models.variable import Variable
+from airflow.secrets.metastore import MetastoreBackend
+from airflow.utils.session import create_session
+
+from tests_common.test_utils.db import clear_db_connections, clear_db_variables
+
+pytestmark = pytest.mark.db_test
+
+
+class TestMetastoreBackendSessionSafety:
+ """MetastoreBackend must not corrupt the shared scoped session.
+
+ Regression tests for https://github.com/apache/airflow/issues/62244.
+ """
+
+ def setup_method(self) -> None:
+ clear_db_connections()
+ clear_db_variables()
+
+ def teardown_method(self) -> None:
+ clear_db_connections()
+ clear_db_variables()
+
+ @pytest.mark.parametrize("conn_exists", [True, False], ids=["found",
"not_found"])
+ def test_get_connection_preserves_pending_session_objects(self,
conn_exists):
+ """get_connection must not remove unrelated pending objects from
session.new."""
+ if conn_exists:
+ with create_session() as session:
+ session.add(Connection(conn_id="target_conn",
conn_type="mysql"))
+ session.commit()
+
+ with create_session() as session:
+ # Simulate pending work from another function sharing the session
+ pending = Connection(conn_id="pending_conn", conn_type="http")
+ session.add(pending)
+
+ # Same session passed to simulate shared scoped session behavior
+ backend = MetastoreBackend()
+ result = backend.get_connection("target_conn", session=session)
+
+ if conn_exists:
+ assert result is not None
+ assert result.conn_id == "target_conn"
+ else:
+ assert result is None
+ # The pending object must still be in session.new — expunge(conn)
should only
+ # detach the queried Connection, not wipe unrelated pending
objects.
+ assert pending in session.new
+
+ @pytest.mark.parametrize("var_exists", [True, False], ids=["found",
"not_found"])
+ def test_get_variable_preserves_pending_session_objects(self, var_exists):
+ """get_variable must not remove unrelated pending objects from
session.new."""
+ if var_exists:
+ Variable.set(key="test_key", value="test_value")
+
+ with create_session() as session:
+ # Use any ORM model as the pending object to detect session
corruption
+ pending = Connection(conn_id="pending_conn", conn_type="http")
+ session.add(pending)
+
+ backend = MetastoreBackend()
+ result = backend.get_variable("test_key", session=session)
+
+ if var_exists:
+ assert result == "test_value"
+ else:
+ assert result is None
+ # The pending object must still be in session.new —
expunge(var_value) should only
+ # detach the queried Variable, not wipe unrelated pending objects.
+ assert pending in session.new
+
+ def test_get_connection_returns_detached_object(self):
+ """Returned connection must be detached so callers can use it
freely."""
+ from sqlalchemy import inspect as sa_inspect
+
+ with create_session() as session:
+ session.add(Connection(conn_id="test_conn", conn_type="mysql",
host="localhost"))
+ session.commit()
+
+ backend = MetastoreBackend()
+ conn = backend.get_connection("test_conn")
+
+ assert conn is not None
+ # Object should be detached — not tracked by any session
+ assert sa_inspect(conn).detached
+ # Attributes should still be accessible
+ assert conn.conn_id == "test_conn"
+ assert conn.host == "localhost"