This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 44900b84a9c fix: Replace expunge_all with expunge in MetastoreBackend 
(#63080)
44900b84a9c is described below

commit 44900b84a9c9fcdbb979e91543a51d777f15e34a
Author: Shivam Rastogi <[email protected]>
AuthorDate: Tue Mar 10 09:15:09 2026 -0700

    fix: Replace expunge_all with expunge in MetastoreBackend (#63080)
    
    MetastoreBackend.get_connection() and get_variable() called
    session.expunge_all() to detach the returned object from the session.
    This removed all objects from the shared scoped session, including
    unrelated pending objects added by other code sharing the same
    thread-local session.
    
    This caused team-scoped DAG bundles to silently fail to persist when
    sync_bundles_to_db triggered a connection lookup through S3DagBundle's
    view_url_template, which initializes an S3Hook and calls get_connection.
    
    Replace expunge_all() with expunge(obj) to only detach the specific
    queried Connection or Variable, leaving all other session state intact.
    
    closes: #62244
---
 airflow-core/src/airflow/secrets/metastore.py      |   5 +-
 .../tests/unit/always/test_secrets_metastore.py    | 110 +++++++++++++++++++++
 2 files changed, 113 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/secrets/metastore.py 
b/airflow-core/src/airflow/secrets/metastore.py
index bde8423158d..e7f25a9f917 100644
--- a/airflow-core/src/airflow/secrets/metastore.py
+++ b/airflow-core/src/airflow/secrets/metastore.py
@@ -57,7 +57,8 @@ class MetastoreBackend(BaseSecretsBackend):
             )
             .limit(1)
         )
-        session.expunge_all()
+        if conn:
+            session.expunge(conn)
         return conn
 
     @provide_session
@@ -79,7 +80,7 @@ class MetastoreBackend(BaseSecretsBackend):
             .where(Variable.key == key, or_(Variable.team_name == team_name, 
Variable.team_name.is_(None)))
             .limit(1)
         )
-        session.expunge_all()
         if var_value:
+            session.expunge(var_value)
             return var_value.val
         return None
diff --git a/airflow-core/tests/unit/always/test_secrets_metastore.py 
b/airflow-core/tests/unit/always/test_secrets_metastore.py
new file mode 100644
index 00000000000..d13419563f7
--- /dev/null
+++ b/airflow-core/tests/unit/always/test_secrets_metastore.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.models.connection import Connection
+from airflow.models.variable import Variable
+from airflow.secrets.metastore import MetastoreBackend
+from airflow.utils.session import create_session
+
+from tests_common.test_utils.db import clear_db_connections, clear_db_variables
+
+pytestmark = pytest.mark.db_test
+
+
+class TestMetastoreBackendSessionSafety:
+    """MetastoreBackend must not corrupt the shared scoped session.
+
+    Regression tests for https://github.com/apache/airflow/issues/62244.
+    """
+
+    def setup_method(self) -> None:
+        clear_db_connections()
+        clear_db_variables()
+
+    def teardown_method(self) -> None:
+        clear_db_connections()
+        clear_db_variables()
+
+    @pytest.mark.parametrize("conn_exists", [True, False], ids=["found", 
"not_found"])
+    def test_get_connection_preserves_pending_session_objects(self, 
conn_exists):
+        """get_connection must not remove unrelated pending objects from 
session.new."""
+        if conn_exists:
+            with create_session() as session:
+                session.add(Connection(conn_id="target_conn", 
conn_type="mysql"))
+                session.commit()
+
+        with create_session() as session:
+            # Simulate pending work from another function sharing the session
+            pending = Connection(conn_id="pending_conn", conn_type="http")
+            session.add(pending)
+
+            # Same session passed to simulate shared scoped session behavior
+            backend = MetastoreBackend()
+            result = backend.get_connection("target_conn", session=session)
+
+            if conn_exists:
+                assert result is not None
+                assert result.conn_id == "target_conn"
+            else:
+                assert result is None
+            # The pending object must still be in session.new — expunge(conn) 
should only
+            # detach the queried Connection, not wipe unrelated pending 
objects.
+            assert pending in session.new
+
+    @pytest.mark.parametrize("var_exists", [True, False], ids=["found", 
"not_found"])
+    def test_get_variable_preserves_pending_session_objects(self, var_exists):
+        """get_variable must not remove unrelated pending objects from 
session.new."""
+        if var_exists:
+            Variable.set(key="test_key", value="test_value")
+
+        with create_session() as session:
+            # Use any ORM model as the pending object to detect session 
corruption
+            pending = Connection(conn_id="pending_conn", conn_type="http")
+            session.add(pending)
+
+            backend = MetastoreBackend()
+            result = backend.get_variable("test_key", session=session)
+
+            if var_exists:
+                assert result == "test_value"
+            else:
+                assert result is None
+            # The pending object must still be in session.new — 
expunge(var_value) should only
+            # detach the queried Variable, not wipe unrelated pending objects.
+            assert pending in session.new
+
+    def test_get_connection_returns_detached_object(self):
+        """Returned connection must be detached so callers can use it 
freely."""
+        from sqlalchemy import inspect as sa_inspect
+
+        with create_session() as session:
+            session.add(Connection(conn_id="test_conn", conn_type="mysql", 
host="localhost"))
+            session.commit()
+
+        backend = MetastoreBackend()
+        conn = backend.get_connection("test_conn")
+
+        assert conn is not None
+        # Object should be detached — not tracked by any session
+        assert sa_inspect(conn).detached
+        # Attributes should still be accessible
+        assert conn.conn_id == "test_conn"
+        assert conn.host == "localhost"

Reply via email to