This is an automated email from the ASF dual-hosted git repository. msumit pushed a commit to branch sumit/ro_session in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6b3d36ced7a6f267d506eae6d50749177b7b030a Author: Sumit Maheshwari <[email protected]> AuthorDate: Wed Aug 20 15:29:21 2025 +0530 [AIP-94] Add support for read-only sql_alchemy sessions --- .../src/airflow/config_templates/config.yml | 21 +++++ airflow-core/src/airflow/models/deadline.py | 4 +- airflow-core/src/airflow/models/serialized_dag.py | 10 +-- airflow-core/src/airflow/settings.py | 95 +++++++++++++++++++++- airflow-core/src/airflow/utils/session.py | 66 +++++++++++++++ 5 files changed, 188 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 3a824fbcda6..22487e6d63b 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -544,6 +544,27 @@ database: sensitive: true example: "postgresql+asyncpg://postgres:airflow@postgres/airflow" default: ~ + sql_alchemy_conn_readonly: + description: | + The SQLAlchemy connection string to a read-only replica of the metadata database. + This is used for read-heavy operations to offload traffic from the master database. + If not set, read operations will use the same connection as ``sql_alchemy_conn``. + The read-only database should be a replica or read-only instance of the master database. + version_added: 3.1.0 + type: string + sensitive: true + example: "postgresql://postgres:airflow@postgres-readonly/airflow" + default: ~ + sql_alchemy_conn_readonly_async: + description: | + The SQLAlchemy connection string to a read-only replica of the metadata database for async connections. + If not set, Airflow will automatically derive this from ``sql_alchemy_conn_readonly`` if available, + or fall back to ``sql_alchemy_conn_async``. + version_added: 3.1.0 + type: string + sensitive: true + example: "postgresql+asyncpg://postgres:airflow@postgres-readonly/airflow" + default: ~ sql_alchemy_engine_args: description: | Extra engine specific keyword args passed to SQLAlchemy's create_engine, as a JSON-encoded value diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 8dc48acf314..556a3e6449a 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -38,7 +38,7 @@ from airflow.serialization.serde import deserialize, serialize from airflow.settings import json from airflow.triggers.deadline import PAYLOAD_STATUS_KEY, DeadlineCallbackTrigger from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.session import provide_session +from airflow.utils.session import provide_session, provide_readonly_session from airflow.utils.sqlalchemy import UtcDateTime if TYPE_CHECKING: @@ -356,7 +356,7 @@ class ReferenceModels: DeadlineReferenceType = ReferenceModels.BaseDeadlineReference -@provide_session +@provide_readonly_session def _fetch_from_db(model_reference: Column, session=None, **conditions) -> datetime: """ Fetch a datetime value from the database using the provided model reference and filtering conditions. diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 6444d5720de..fe3fe05c821 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -48,7 +48,7 @@ from airflow.serialization.dag_dependency import DagDependency from airflow.serialization.serialized_objects import SerializedDAG from airflow.settings import COMPRESS_SERIALIZED_DAGS, json from airflow.utils.hashlib_wrapper import md5 -from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.session import NEW_SESSION, NEW_READONLY_SESSION, provide_session, provide_readonly_session from airflow.utils.sqlalchemy import UtcDateTime if TYPE_CHECKING: @@ -472,9 +472,9 @@ class SerializedDagModel(Base): return select(cls).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1) @classmethod - @provide_session + @provide_readonly_session def get_latest_serialized_dags( - cls, *, dag_ids: list[str], session: Session = NEW_SESSION + cls, *, dag_ids: list[str], session: Session = NEW_READONLY_SESSION ) -> list[SerializedDagModel]: """ Get the latest serialized dags of given DAGs. @@ -501,8 +501,8 @@ class SerializedDagModel(Base): return latest_serdags or [] @classmethod - @provide_session - def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDAG]: + @provide_readonly_session + def read_all_dags(cls, session: Session = NEW_READONLY_SESSION) -> dict[str, SerializedDAG]: """ Read all DAGs in serialized_dag table. diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index db5ce959b1f..7f0cf1b0dd4 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -89,6 +89,8 @@ SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format") SQL_ALCHEMY_CONN: str | None = None SQL_ALCHEMY_CONN_ASYNC: str | None = None +SQL_ALCHEMY_CONN_READONLY: str | None = None +SQL_ALCHEMY_CONN_READONLY_ASYNC: str | None = None PLUGINS_FOLDER: str | None = None LOGGING_CLASS_PATH: str | None = None DONOT_MODIFY_HANDLERS: bool | None = None @@ -111,6 +113,13 @@ NonScopedSession: sessionmaker async_engine: AsyncEngine AsyncSession: Callable[..., SAAsyncSession] +# Read-only database connection components +readonly_engine: Engine +ReadOnlySession: scoped_session +NonScopedReadOnlySession: sessionmaker +readonly_async_engine: AsyncEngine +ReadOnlyAsyncSession: Callable[..., SAAsyncSession] + # The JSON library to use for DAG Serialization and De-Serialization json = json @@ -222,6 +231,8 @@ def configure_vars(): """Configure Global Variables from airflow.cfg.""" global SQL_ALCHEMY_CONN global SQL_ALCHEMY_CONN_ASYNC + global SQL_ALCHEMY_CONN_READONLY + global SQL_ALCHEMY_CONN_READONLY_ASYNC global DAGS_FOLDER global PLUGINS_FOLDER global DONOT_MODIFY_HANDLERS @@ -232,6 +243,19 @@ def configure_vars(): else: SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN) + # Configure read-only database connections + if conf.has_option("database", "sql_alchemy_conn_readonly"): + SQL_ALCHEMY_CONN_READONLY = conf.get("database", "sql_alchemy_conn_readonly") + else: + SQL_ALCHEMY_CONN_READONLY = SQL_ALCHEMY_CONN + + if conf.has_option("database", "sql_alchemy_conn_readonly_async"): + SQL_ALCHEMY_CONN_READONLY_ASYNC = conf.get("database", "sql_alchemy_conn_readonly_async") + elif SQL_ALCHEMY_CONN_READONLY: + SQL_ALCHEMY_CONN_READONLY_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN_READONLY) + else: + SQL_ALCHEMY_CONN_READONLY_ASYNC = SQL_ALCHEMY_CONN_ASYNC + DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins")) @@ -360,6 +384,31 @@ def _configure_async_session() -> None: ) +def _configure_readonly_async_session() -> None: + """ + Configure read-only async SQLAlchemy session. + + This exists so tests can reconfigure the session. How SQLAlchemy configures + this does not work well with Pytest and you can end up with issues when the + session and runs in a different event loop from the test itself. + """ + global ReadOnlyAsyncSession + global readonly_async_engine + + readonly_async_engine = create_async_engine( + SQL_ALCHEMY_CONN_READONLY_ASYNC, + connect_args=_get_connect_args("async"), + future=True, + ) + ReadOnlyAsyncSession = sessionmaker( + bind=readonly_async_engine, + autocommit=False, + autoflush=False, + class_=SAAsyncSession, + expire_on_commit=False, + ) + + def configure_orm(disable_connection_pool=False, pool_class=None): """Configure ORM using SQLAlchemy.""" from airflow.sdk.execution_time.secrets_masker import mask_secret @@ -375,11 +424,16 @@ def configure_orm(disable_connection_pool=False, pool_class=None): global NonScopedSession global Session global engine + global NonScopedReadOnlySession + global ReadOnlySession + global readonly_engine if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": # Skip DB initialization in unit tests, if DB tests are skipped Session = SkipDBTestsSession + ReadOnlySession = SkipDBTestsSession engine = None + readonly_engine = None return log.debug("Setting up DB connection pool (PID %s)", os.getpid()) engine_args = prepare_engine_args(disable_connection_pool, pool_class) @@ -401,6 +455,22 @@ def configure_orm(disable_connection_pool=False, pool_class=None): mask_secret(engine.url.password) setup_event_handlers(engine) + # Configure read-only database connections + readonly_connect_args = _get_connect_args("sync") + if SQL_ALCHEMY_CONN_READONLY.startswith("sqlite"): + readonly_connect_args["check_same_thread"] = False + + readonly_engine_args = prepare_engine_args(disable_connection_pool, pool_class) + readonly_engine = create_engine( + SQL_ALCHEMY_CONN_READONLY, + connect_args=readonly_connect_args, + **readonly_engine_args, + future=True, + ) + _configure_readonly_async_session() + mask_secret(readonly_engine.url.password) + setup_event_handlers(readonly_engine) + if conf.has_option("database", "sql_alchemy_session_maker"): _session_maker = conf.getimport("database", "sql_alchemy_session_maker") else: @@ -412,6 +482,10 @@ def configure_orm(disable_connection_pool=False, pool_class=None): ) NonScopedSession = _session_maker(engine) Session = scoped_session(NonScopedSession) + + # Configure read-only sessions + NonScopedReadOnlySession = _session_maker(readonly_engine) + ReadOnlySession = scoped_session(NonScopedReadOnlySession) if register_at_fork := getattr(os, "register_at_fork", None): # https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork @@ -421,6 +495,10 @@ def configure_orm(disable_connection_pool=False, pool_class=None): engine.dispose(close=False) if async_engine := _globals.get("async_engine"): async_engine.sync_engine.dispose(close=False) + if readonly_engine := _globals.get("readonly_engine"): + readonly_engine.dispose(close=False) + if readonly_async_engine := _globals.get("readonly_async_engine"): + readonly_async_engine.sync_engine.dispose(close=False) # Won't work on Windows register_at_fork(after_in_child=clean_in_fork) @@ -515,9 +593,15 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None): def dispose_orm(do_log: bool = True): """Properly close pooled database connections.""" global Session, engine, NonScopedSession + global ReadOnlySession, readonly_engine, NonScopedReadOnlySession _globals = globals() - if _globals.get("engine") is None and _globals.get("Session") is None: + if ( + _globals.get("engine") is None + and _globals.get("Session") is None + and _globals.get("readonly_engine") is None + and _globals.get("ReadOnlySession") is None + ): return if do_log: @@ -531,10 +615,19 @@ def dispose_orm(do_log: bool = True): NonScopedSession = None close_all_sessions() + if "ReadOnlySession" in _globals and ReadOnlySession is not None: + ReadOnlySession.remove() + ReadOnlySession = None + NonScopedReadOnlySession = None + if "engine" in _globals and engine is not None: engine.dispose() engine = None + if "readonly_engine" in _globals and readonly_engine is not None: + readonly_engine.dispose() + readonly_engine = None + def reconfigure_orm(disable_connection_pool=False, pool_class=None): """Properly close database connections and re-configure ORM.""" diff --git a/airflow-core/src/airflow/utils/session.py b/airflow-core/src/airflow/utils/session.py index 211c1645320..321387af68d 100644 --- a/airflow-core/src/airflow/utils/session.py +++ b/airflow-core/src/airflow/utils/session.py @@ -48,6 +48,26 @@ def create_session(scoped: bool = True) -> Generator[SASession, None, None]: session.close() [email protected] +def create_readonly_session(scoped: bool = True) -> Generator[SASession, None, None]: + """Contextmanager that will create and teardown a read-only session.""" + if scoped: + ReadOnlySession = getattr(settings, "ReadOnlySession", None) + else: + ReadOnlySession = getattr(settings, "NonScopedReadOnlySession", None) + if ReadOnlySession is None: + raise RuntimeError("ReadOnlySession must be set before!") + session = ReadOnlySession() + try: + yield session + # Note: No commit() for read-only sessions + except Exception: + session.rollback() + raise + finally: + session.close() + + @contextlib.asynccontextmanager async def create_session_async(): """ @@ -66,6 +86,24 @@ async def create_session_async(): raise [email protected] +async def create_readonly_session_async(): + """ + Context manager to create async read-only session. + + :meta private: + """ + from airflow.settings import ReadOnlyAsyncSession + + async with ReadOnlyAsyncSession() as session: + try: + yield session + # Note: No commit() for read-only sessions + except Exception: + await session.rollback() + raise + + PS = ParamSpec("PS") RT = TypeVar("RT") @@ -102,8 +140,36 @@ def provide_session(func: Callable[PS, RT]) -> Callable[PS, RT]: return wrapper +def provide_readonly_session(func: Callable[PS, RT]) -> Callable[PS, RT]: + """ + Provide a read-only session if it isn't provided. + + This decorator is intended for read-only operations that can benefit from + using a read-only database replica to offload traffic from the master database. + If you want to reuse a session or run the function as part of a transaction, + you pass it to the function, if not this wrapper will create a read-only + session and close it for you. + """ + session_args_idx = find_session_idx(func) + + @wraps(func) + def wrapper(*args, **kwargs) -> RT: + if "session" in kwargs or session_args_idx < len(args): + return func(*args, **kwargs) + with create_readonly_session() as session: + return func(*args, session=session, **kwargs) + + return wrapper + + # A fake session to use in functions decorated by provide_session. This allows # the 'session' argument to be of type Session instead of Session | None, # making it easier to type hint the function body without dealing with the None # case that can never happen at runtime. NEW_SESSION: SASession = cast("SASession", None) + +# A fake read-only session to use in functions decorated by provide_readonly_session. +# This allows the 'session' argument to be of type Session instead of Session | None, +# making it easier to type hint the function body without dealing with the None +# case that can never happen at runtime. +NEW_READONLY_SESSION: SASession = cast("SASession", None)
