This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new c1b6e5729fc Fix async engine missing pool_recycle and pool_pre_ping 
configuration (#65276) (#66866)
c1b6e5729fc is described below

commit c1b6e5729fca70c788beec8320687611b80ddc40
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 10:48:45 2026 +0530

    Fix async engine missing pool_recycle and pool_pre_ping configuration 
(#65276) (#66866)
    
    The async SQLAlchemy engine was created without any pool health
    settings while the sync engine got pool_size, pool_recycle,
    pool_pre_ping, and max_overflow from [database] config. This meant
    dead connections from PostgreSQL idle timeouts or pgbouncer disconnects
    were never detected by the async pool.
    
    Read the same [database] config values for the async engine. Also
    respect SQL_ALCHEMY_POOL_ENABLED=False by using NullPool, matching
    the sync engine behavior.
    
    (cherry picked from commit 981b0e218c5cf18ffc74cff21a991e20e34611a8)
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 airflow-core/src/airflow/settings.py          | 18 ++++++++
 airflow-core/tests/unit/core/test_settings.py | 65 +++++++++++++++++++++++++--
 2 files changed, 80 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/settings.py 
b/airflow-core/src/airflow/settings.py
index e42aa3ceff6..739e2437f6a 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -374,16 +374,20 @@ def create_async_metadata_engine(
     sql_alchemy_conn_async: str,
     *,
     connect_args: dict[str, Any],
+    engine_args: dict[str, Any] | None = None,
 ) -> AsyncEngine:
     """
     Create the async SQLAlchemy Engine for the Airflow metadata database.
 
     Override in ``airflow_local_settings.py`` to customize async engine 
creation.
     For ``do_connect`` handlers, register on ``engine.sync_engine``.
+
+    :param engine_args: Pool and engine configuration (pool_size, 
pool_recycle, etc.).
     """
     return create_async_engine(
         sql_alchemy_conn_async,
         connect_args=connect_args,
+        **(engine_args or {}),
         future=True,
     )
 
@@ -403,9 +407,23 @@ def _configure_async_session() -> None:
         AsyncSession = None
         return
 
+    # Apply the same pool health settings used by the sync engine.
+    # Without these, the async pool uses SQLAlchemy defaults (pool_recycle=-1,
+    # pool_pre_ping=False) which means dead connections from PostgreSQL idle
+    # timeouts or pgbouncer disconnects are never detected.
+    engine_args: dict[str, Any] = {}
+    if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
+        engine_args["poolclass"] = NullPool
+    elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"):
+        engine_args["pool_size"] = conf.getint("database", 
"SQL_ALCHEMY_POOL_SIZE", fallback=5)
+        engine_args["pool_recycle"] = conf.getint("database", 
"SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
+        engine_args["pool_pre_ping"] = conf.getboolean("database", 
"SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
+        engine_args["max_overflow"] = conf.getint("database", 
"SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
+
     async_engine = create_async_metadata_engine(
         SQL_ALCHEMY_CONN_ASYNC,
         connect_args=_get_connect_args("async"),
+        engine_args=engine_args,
     )
     AsyncSession = async_sessionmaker(
         bind=async_engine,
diff --git a/airflow-core/tests/unit/core/test_settings.py 
b/airflow-core/tests/unit/core/test_settings.py
index 22cb00b36bb..588567181c5 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -27,6 +27,7 @@ from unittest.mock import MagicMock, call, patch
 import pytest
 from sqlalchemy.engine import Engine
 from sqlalchemy.ext.asyncio import AsyncEngine
+from sqlalchemy.pool import NullPool
 
 from airflow import settings
 from airflow.exceptions import AirflowClusterPolicyViolation, 
AirflowConfigException
@@ -262,18 +263,70 @@ class TestMetadataEngineHooks:
     def test_configure_async_session_delegates_to_create_async_metadata_engine(
         self, mock_create_async_engine
     ):
-        """_configure_async_session() must call 
create_async_metadata_engine."""
+        """_configure_async_session() must call create_async_metadata_engine 
with no pool args for sqlite."""
         from airflow import settings
 
         mock_create_async_engine.return_value = MagicMock()
 
-        with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"sqlite+aiosqlite://"):
+        with (
+            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"sqlite+aiosqlite://"),
+            patch("airflow.settings.conf") as mock_conf,
+        ):
+            # Pool enabled but sqlite -- pool args should be skipped
+            mock_conf.getboolean.return_value = True
             settings._configure_async_session()
 
         mock_create_async_engine.assert_called_once()
         call_kwargs = mock_create_async_engine.call_args
         assert call_kwargs[0][0] == "sqlite+aiosqlite://"
         assert "connect_args" in call_kwargs[1]
+        # sqlite doesn't support pool size args
+        assert call_kwargs[1]["engine_args"] == {}
+
+    @patch("airflow.settings.create_async_metadata_engine")
+    def test_configure_async_session_passes_pool_args_for_non_sqlite(self, 
mock_create_async_engine):
+        """_configure_async_session() must pass pool configuration for 
non-sqlite backends."""
+        from airflow import settings
+
+        mock_create_async_engine.return_value = MagicMock()
+
+        with (
+            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"postgresql+asyncpg://localhost/airflow"),
+            patch("airflow.settings.conf") as mock_conf,
+        ):
+            mock_conf.getint.side_effect = lambda section, key, fallback=None: 
{
+                "SQL_ALCHEMY_POOL_SIZE": 10,
+                "SQL_ALCHEMY_POOL_RECYCLE": 900,
+                "SQL_ALCHEMY_MAX_OVERFLOW": 5,
+            }.get(key, fallback)
+            mock_conf.getboolean.return_value = True
+
+            settings._configure_async_session()
+
+        engine_args = mock_create_async_engine.call_args[1]["engine_args"]
+        assert engine_args["pool_size"] == 10
+        assert engine_args["pool_recycle"] == 900
+        assert engine_args["pool_pre_ping"] is True
+        assert engine_args["max_overflow"] == 5
+
+    @patch("airflow.settings.create_async_metadata_engine")
+    def test_configure_async_session_uses_nullpool_when_pool_disabled(self, 
mock_create_async_engine):
+        """_configure_async_session() must use NullPool when 
SQL_ALCHEMY_POOL_ENABLED is False."""
+        from airflow import settings
+
+        mock_create_async_engine.return_value = MagicMock()
+
+        with (
+            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"postgresql+asyncpg://localhost/airflow"),
+            patch("airflow.settings.conf") as mock_conf,
+        ):
+            mock_conf.getboolean.return_value = False
+
+            settings._configure_async_session()
+
+        engine_args = mock_create_async_engine.call_args[1]["engine_args"]
+        assert engine_args["poolclass"] is NullPool
+        assert "pool_size" not in engine_args
 
     @patch("airflow.settings.create_async_metadata_engine")
     def test_configure_async_session_skips_when_no_async_conn(self, 
mock_create_async_engine):
@@ -313,12 +366,18 @@ class TestMetadataEngineHooks:
 
         mock_sa_create_async.return_value = MagicMock()
         connect_args = {"timeout": 30}
+        engine_args = {"pool_size": 5, "pool_recycle": 1800, "pool_pre_ping": 
True}
 
-        settings.create_async_metadata_engine("sqlite+aiosqlite://", 
connect_args=connect_args)
+        settings.create_async_metadata_engine(
+            "sqlite+aiosqlite://", connect_args=connect_args, 
engine_args=engine_args
+        )
 
         mock_sa_create_async.assert_called_once_with(
             "sqlite+aiosqlite://",
             connect_args={"timeout": 30},
+            pool_size=5,
+            pool_recycle=1800,
+            pool_pre_ping=True,
             future=True,
         )
 

Reply via email to