This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aec3744e15f fix(fab): recover from first idle MySQL disconnect in 
token auth (#62919)
aec3744e15f is described below

commit aec3744e15fff587b9be623b6b35f0840139d632
Author: Kushal Bohra <[email protected]>
AuthorDate: Tue Mar 10 10:37:59 2026 -0700

    fix(fab): recover from first idle MySQL disconnect in token auth (#62919)
    
    * fix(fab): recover from first idle MySQL disconnect in token auth
    
    Retry user deserialization once after clearing the poisoned scoped session 
so the first request after a server-side idle timeout does not return 500. Add 
regression coverage for transient disconnect recovery and factorize 
deserialization lookup logic to avoid duplication.
    
    * test(fab): rename retry mock for clarity
---
 .../providers/fab/auth_manager/fab_auth_manager.py | 21 ++++++++++++++++----
 .../unit/fab/auth_manager/test_fab_auth_manager.py | 23 ++++++++++++++++++++--
 2 files changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py 
b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
index 99fcbeff134..2ad86559a74 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
@@ -267,16 +267,29 @@ class FabAuthManager(BaseAuthManager[User]):
 
     @cachedmethod(lambda self: self.cache, key=lambda _, token: 
int(token["sub"]))
     def deserialize_user(self, token: dict[str, Any]) -> User:
+        user_id = int(token["sub"])
+
+        def _fetch_user() -> User:
+            try:
+                return self.session.scalars(select(User).where(User.id == 
user_id)).one()
+            except NoResultFound:
+                raise ValueError(f"User with id {token['sub']} not found")
+
         try:
-            return self.session.scalars(select(User).where(User.id == 
int(token["sub"]))).one()
-        except NoResultFound:
-            raise ValueError(f"User with id {token['sub']} not found")
+            return _fetch_user()
         except SQLAlchemyError:
             # Discard the poisoned scoped session so the next request gets a
             # fresh connection from the pool instead of a PendingRollbackError.
             with suppress(Exception):
                 self.session.remove()
-            raise
+            try:
+                return _fetch_user()
+            except SQLAlchemyError:
+                # If retry also fails, remove the scoped session again to keep
+                # future requests from reusing a broken transaction state.
+                with suppress(Exception):
+                    self.session.remove()
+                raise
 
     def serialize_user(self, user: User) -> dict[str, Any]:
         return {"sub": str(user.id)}
diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py 
b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
index 3b3ebd4bbbe..99f7dc24edc 100644
--- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
+++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
@@ -996,7 +996,7 @@ class TestDeserializeUserSessionCleanup:
         ids=["operational_error", "pending_rollback_error"],
     )
     def test_db_error_calls_session_remove(self, auth_manager_with_appbuilder, 
raised_exc):
-        """session.remove() is called on SQLAlchemy errors so the next request 
recovers."""
+        """session.remove() is called on SQLAlchemy errors before and after 
retry."""
         mock_session = MagicMock(spec=["scalars", "remove"])
         mock_session.scalars.side_effect = raised_exc
         auth_manager_with_appbuilder.cache.pop(99997, None)
@@ -1005,7 +1005,7 @@ class TestDeserializeUserSessionCleanup:
             with pytest.raises(type(raised_exc)):
                 auth_manager_with_appbuilder.deserialize_user({"sub": "99997"})
 
-        mock_session.remove.assert_called_once()
+        assert mock_session.remove.call_count == 2
 
     def test_db_error_propagates_when_session_remove_raises(self, 
auth_manager_with_appbuilder):
         """The original SQLAlchemyError propagates even if session.remove() 
itself raises."""
@@ -1021,6 +1021,25 @@ class TestDeserializeUserSessionCleanup:
             with pytest.raises(OperationalError):
                 auth_manager_with_appbuilder.deserialize_user({"sub": "99997"})
 
+        assert mock_session.remove.call_count == 2
+
+    def test_db_error_retries_once_and_recovers(self, 
auth_manager_with_appbuilder):
+        """A transient DB disconnect is recovered by removing session and 
retrying once."""
+        user = Mock()
+        user.id = 99996
+        original_exc = OperationalError("connection dropped", None, 
Exception())
+        retry_query_result = Mock()
+        retry_query_result.one.return_value = user
+
+        mock_session = MagicMock(spec=["scalars", "remove"])
+        mock_session.scalars.side_effect = [original_exc, retry_query_result]
+        auth_manager_with_appbuilder.cache.pop(user.id, None)
+
+        with self._patched_session(auth_manager_with_appbuilder, mock_session):
+            result = auth_manager_with_appbuilder.deserialize_user({"sub": 
str(user.id)})
+
+        assert result == user
+        assert mock_session.scalars.call_count == 2
         mock_session.remove.assert_called_once()
 
 

Reply via email to