This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 0dda7d44c8c Add configurable LRU+TTL caching for API server DAG 
retrieval (#60804) (#66862)
0dda7d44c8c is described below

commit 0dda7d44c8c9949280c9a80a2d2b41c4ccbcba3e
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 18:51:24 2026 +0530

    Add configurable LRU+TTL caching for API server DAG retrieval (#60804) 
(#66862)
    
    Fixes memory growth in long-running API servers by adding bounded LRU+TTL 
caching to `DBDagBag`. Previously, the internal dict cache never expired and 
never evicted, causing memory to grow indefinitely as DAG versions accumulated 
(~500 MB/day with 100+ DAGs updating daily).
    
    Two new `[api]` config options control caching:
    
    | Config | Default | Description |
    |--------|---------|-------------|
    | `dag_cache_size` | `64` | Max cached DAG versions (0 = unbounded dict, no 
eviction) |
    | `dag_cache_ttl` | `3600` | TTL in seconds (0 = LRU only, no time-based 
expiry) |
    
    **API server only.** The scheduler continues using a plain unbounded dict 
with zero lock overhead (`nullcontext` instead of `RLock`). The bounded cache + 
lock is only created when `cache_size > 0`.
    
    **Cache thrashing prevention.** `iter_all_latest_version_dags()` (used by 
the DAG listing endpoint) bypasses the cache entirely. Without this, every DAG 
listing request would flush the hot working set and replace it with a full scan 
of all DAGs.
    
    **Double-checked locking.** When multiple threads miss on the same 
`version_id` concurrently, only the first thread queries the DB. The rest find 
it cached after acquiring the lock. Metrics are emitted correctly: a single 
lookup never counts as both a hit and a miss.
    
    **Separate model cache.** `get_serialized_dag_model()` maintains its own 
dict cache. The triggerer needs the full `SerializedDagModel` (for `.data`), 
not the deserialized `SerializedDAG` stored in the LRU/TTL cache.
    
    **Cache keying.** The cache is keyed by DAG version ID. Lookups by `dag_id` 
(e.g., viewing a DAG's details) always query the DB for the latest version, but 
the deserialized result is cached for subsequent version-specific lookups 
(e.g., task instance views for a specific DAG run).
    
    **Staleness.** After a DAG is updated, the API server may serve the 
previous version until the cached entry expires (controlled by 
`dag_cache_ttl`). This is documented in the config description.
    
    **Why `cachetools`.** `cachetools` is a small, pure-Python library (~1K 
LOC) already present as a transitive dependency via `google-auth`. It provides 
battle-tested `LRUCache` and `TTLCache` implementations. Pinned at `>=6.0.0` to 
match the FAB provider.
    
    **Why `RLock`.** `cachetools` caches are NOT thread-safe -- `.get()` 
mutates internal doubly-linked lists (LRU reordering) and TTL access triggers 
cleanup. Without synchronization, concurrent access can corrupt the data 
structure.
    
    | Metric | Type | Description |
    |--------|------|-------------|
    | `api_server.dag_bag.cache_hit` | Counter | Cache hits (including 
double-checked locking hits) |
    | `api_server.dag_bag.cache_miss` | Counter | Confirmed misses (after 
double-check) |
    | `api_server.dag_bag.cache_clear` | Counter | Cache clears |
    | `api_server.dag_bag.cache_size` | Gauge | Current cache size (sampled at 
10%) |
    
    - Default behavior unchanged for scheduler and triggerer (unbounded dict, 
no lock)
    - API server gets caching by default (`dag_cache_size=64`, 
`dag_cache_ttl=3600`)
    - Use `dag_cache_size=0` to restore pre-change behavior (unbounded dict)
    - No breaking changes to public APIs; `get_serialized_dag_model()` and 
`get_dag()` signatures preserved
    
    - #64326 (closed) -- similar fix with OrderedDict-based LRU, no TTL
    - #60940 (merged) -- gunicorn support with rolling worker restarts 
(complementary, handles memory growth from any source)
    
    (cherry picked from commit 26cbdcbe948c105322fee64064b24697f03b9dc1)
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 .../administration-and-deployment/web-stack.rst    |   6 +
 airflow-core/docs/faq.rst                          |  29 ++-
 airflow-core/newsfragments/60804.feature.rst       |   1 +
 airflow-core/pyproject.toml                        |   1 +
 .../src/airflow/api_fastapi/common/dagbag.py       |  25 +-
 .../src/airflow/config_templates/config.yml        |  25 ++
 airflow-core/src/airflow/models/dagbag.py          | 114 ++++++++-
 .../tests/unit/api_fastapi/common/test_dagbag.py   |  28 +++
 airflow-core/tests/unit/models/test_dagbag.py      | 279 ++++++++++++++++++++-
 .../observability/metrics/metrics_template.yaml    |  24 ++
 uv.lock                                            |   2 +
 11 files changed, 519 insertions(+), 15 deletions(-)

diff --git a/airflow-core/docs/administration-and-deployment/web-stack.rst 
b/airflow-core/docs/administration-and-deployment/web-stack.rst
index 146ee6952c2..fd32e75db55 100644
--- a/airflow-core/docs/administration-and-deployment/web-stack.rst
+++ b/airflow-core/docs/administration-and-deployment/web-stack.rst
@@ -142,6 +142,8 @@ The following configuration options are available in the 
``[api]`` section:
 - ``server_type``: ``uvicorn`` (default) or ``gunicorn``
 - ``worker_refresh_interval``: Seconds between worker refresh cycles (0 = 
disabled, default)
 - ``worker_refresh_batch_size``: Number of workers to refresh per cycle 
(default: 1)
+- ``dag_cache_size``: Max cached SerializedDAG versions in the API server 
(default: 64, 0 = unbounded)
+- ``dag_cache_ttl``: TTL in seconds for cached DAGs (default: 3600, 0 = LRU 
only)
 - ``reload_on_plugin_change``: Reload when plugin files change (default: False)
 
 When to Use Gunicorn
@@ -187,6 +189,10 @@ For example, to trigger a rolling restart of the API 
server pods:
 
    kubectl rollout restart deployment airflow-api-server
 
+The API server also supports bounded DAG caching via ``dag_cache_size`` and
+``dag_cache_ttl``, which limits memory consumed by cached SerializedDAG 
objects.
+This reduces memory growth from DAG version accumulation regardless of server 
type.
+
 In many Kubernetes environments, relying solely on Kubernetes OOM kills or
 crash restarts is not recommended, as memory growth may not always trigger an
 OOM event. For production deployments that require automatic worker recycling
diff --git a/airflow-core/docs/faq.rst b/airflow-core/docs/faq.rst
index 76ed8328814..965ae2c25e2 100644
--- a/airflow-core/docs/faq.rst
+++ b/airflow-core/docs/faq.rst
@@ -698,8 +698,28 @@ How to prevent API server memory growth?
 The API server caches serialized Dag objects in memory. Over time, as Dag 
versions accumulate
 (see :ref:`faq:dag-version-inflation`), this cache grows and can consume 
several gigabytes of memory.
 
-The recommended solution (available since Airflow 3.2.0) is to use 
**gunicorn** with **rolling worker
-restarts**. Gunicorn periodically recycles worker processes, releasing all 
accumulated memory. It also
+There are two complementary approaches:
+
+**1. Bounded DAG caching (available since Airflow 3.2.2)**
+
+The API server supports LRU+TTL caching that bounds how many serialized Dag 
versions are kept
+in memory. Configure this in the ``[api]`` section:
+
+.. code-block:: ini
+
+    [api]
+    dag_cache_size = 64    ; max cached versions (0 = unbounded, pre-3.2 
behavior)
+    dag_cache_ttl = 3600   ; seconds before a cached entry expires (0 = LRU 
only)
+
+The cache is keyed by Dag version ID. After a Dag is updated, the API server 
may serve the
+previous version until the cached entry expires (controlled by 
``dag_cache_ttl``).
+
+See the ``[api] dag_cache_size`` and ``[api] dag_cache_ttl`` options in the 
configuration
+reference for full details.
+
+**2. Gunicorn with rolling worker restarts (available since Airflow 3.2.0)**
+
+Gunicorn periodically recycles worker processes, releasing all accumulated 
memory. It also
 uses ``preload`` + ``fork``, so workers share read-only memory pages via 
copy-on-write, reducing overall
 memory usage by 40-50% compared to uvicorn's multiprocess mode.
 
@@ -720,8 +740,9 @@ See :ref:`config:api__server_type`, 
:ref:`config:api__worker_refresh_interval`,
 
 .. note::
 
-    Worker recycling handles memory growth from *any* source, not just the Dag 
cache. It is the
-    recommended approach for production API server deployments.
+    Worker recycling handles memory growth from *any* source, not just the Dag 
cache.
+    For production deployments, using both bounded caching and gunicorn worker 
recycling
+    provides the best results.
 
 
 MySQL and MySQL variant Databases
diff --git a/airflow-core/newsfragments/60804.feature.rst 
b/airflow-core/newsfragments/60804.feature.rst
new file mode 100644
index 00000000000..7b75d02ede6
--- /dev/null
+++ b/airflow-core/newsfragments/60804.feature.rst
@@ -0,0 +1 @@
+Add configurable LRU+TTL caching for API server DAG retrieval via 
``dag_cache_size`` and ``dag_cache_ttl`` config options in the ``[api]`` 
section. This bounds memory growth from accumulated SerializedDAG objects in 
long-running API server processes.
diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index 3d7ced88f74..1b8091c1e20 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -68,6 +68,7 @@ version = "3.2.1"
 
 dependencies = [
     "a2wsgi>=1.10.8",
+    "cachetools>=6.0.0",
     # aiosqlite 0.22.0 has a problem with hanging pytest sessions and we 
excluded it
     # See https://github.com/omnilib/aiosqlite/issues/369
     # It seems that while our test issues are fixed in 0.22.1, sqlalchemy 2 
itself
diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py 
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index 01cfcf09f69..05bc9b08d9a 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -16,21 +16,42 @@
 # under the License.
 from __future__ import annotations
 
+import logging
 from typing import TYPE_CHECKING, Annotated
 
 from fastapi import Depends, HTTPException, Request, status
 from sqlalchemy.orm import Session
 
+from airflow.configuration import conf
 from airflow.models.dagbag import DBDagBag
 
 if TYPE_CHECKING:
     from airflow.models.dagrun import DagRun
     from airflow.serialization.definitions.dag import SerializedDAG
 
+log = logging.getLogger(__name__)
+
 
 def create_dag_bag() -> DBDagBag:
-    """Create DagBag to retrieve DAGs from the database."""
-    return DBDagBag()
+    """Create DagBag with configurable LRU+TTL caching for API server usage."""
+    cache_size = conf.getint("api", "dag_cache_size", fallback=64)
+    cache_ttl_config = conf.getint("api", "dag_cache_ttl", fallback=3600)
+
+    if cache_size < 0:
+        log.warning("dag_cache_size must be >= 0, using unbounded dict")
+        cache_size = 0
+    if cache_ttl_config < 0:
+        log.warning("dag_cache_ttl must be >= 0, disabling TTL")
+        cache_ttl_config = 0
+
+    # Use unbounded dict (no eviction) if cache_size is 0
+    if cache_size <= 0:
+        return DBDagBag(cache_size=0)
+
+    # Disable TTL if cache_ttl is 0
+    cache_ttl: int | None = cache_ttl_config if cache_ttl_config > 0 else None
+
+    return DBDagBag(cache_size=cache_size, cache_ttl=cache_ttl)
 
 
 def dag_bag_from_app(request: Request) -> DBDagBag:
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 8008452c50f..99eb360ab72 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1596,6 +1596,31 @@ api:
       type: string
       example: ~
       default: "False"
+    dag_cache_size:
+      description: |
+        Size of the LRU cache for SerializedDAG objects in the API server.
+        Set to 0 to use an unbounded dict (no eviction, pre-3.2 behavior).
+        The cache is keyed by DAG version ID, so lookups by DAG ID
+        (e.g., viewing a DAG's details) always query the database for the
+        latest version, but the deserialized result is cached for subsequent
+        version-specific lookups.
+      version_added: 3.2.2
+      type: integer
+      example: ~
+      default: "64"
+    dag_cache_ttl:
+      description: |
+        Time-to-live (seconds) for cached SerializedDAG objects in the API 
server.
+        After this time, cached DAGs will be re-fetched from the database on 
next access.
+        Set to 0 to disable TTL (cache entries will only be evicted by LRU 
policy).
+
+        Note: After a DAG is updated, the API server may serve the previous 
version
+        until the cached entry expires. Lower values reduce staleness but 
increase
+        database load.
+      version_added: 3.2.2
+      type: integer
+      example: ~
+      default: "3600"
     base_url:
       description: |
         The base url of the API server. Airflow cannot guess what domain or 
CNAME you are using.
diff --git a/airflow-core/src/airflow/models/dagbag.py 
b/airflow-core/src/airflow/models/dagbag.py
index e04f77d06df..63059884a95 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -18,12 +18,17 @@
 from __future__ import annotations
 
 import hashlib
+from collections.abc import MutableMapping
+from contextlib import nullcontext
+from threading import RLock
 from typing import TYPE_CHECKING, Any
 from uuid import UUID
 
+from cachetools import LRUCache, TTLCache
 from sqlalchemy import String, select
 from sqlalchemy.orm import Mapped, joinedload, mapped_column
 
+from airflow._shared.observability.metrics.stats import Stats
 from airflow.models.base import Base, StringID
 from airflow.models.dag_version import DagVersion
 
@@ -39,31 +44,118 @@ if TYPE_CHECKING:
 
 class DBDagBag:
     """
-    Internal class for retrieving and caching dags in the scheduler.
+    Internal class for retrieving dags from the database.
+
+    Optionally supports LRU+TTL caching when cache_size is provided.
+    The scheduler uses this without caching, while the API server can
+    enable caching via configuration.
 
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[UUID, SerializedDAG] = {}  # dag_version_id to dag
+    def __init__(
+        self,
+        load_op_links: bool = True,
+        cache_size: int | None = None,
+        cache_ttl: int | None = None,
+    ) -> None:
+        """
+        Initialize DBDagBag.
+
+        :param load_op_links: Should the extra operator link be loaded when 
de-serializing the DAG?
+        :param cache_size: Size of LRU cache. If None or 0, uses unbounded 
dict (no eviction).
+        :param cache_ttl: Time-to-live for cache entries in seconds. If None 
or 0, no TTL (LRU only).
+        """
         self.load_op_links = load_op_links
+        self._dags: MutableMapping[UUID | str, SerializedDAG] = {}
+        self._use_cache = False
+
+        # Initialize bounded cache if cache_size is provided and > 0
+        if cache_size and cache_size > 0:
+            if cache_ttl and cache_ttl > 0:
+                self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+            else:
+                self._dags = LRUCache(maxsize=cache_size)
+            self._use_cache = True
+
+        # Lock required for bounded caches: cachetools caches are NOT 
thread-safe
+        # (LRU reordering and TTL cleanup mutate internal linked lists).
+        # nullcontext for unbounded dict avoids lock overhead in the scheduler 
path.
+        self._lock: RLock | nullcontext = RLock() if self._use_cache else 
nullcontext()
 
     def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+        """Read and optionally cache a SerializedDAG from a 
SerializedDagModel."""
         serdag.load_op_links = self.load_op_links
-        if dag := serdag.dag:
+        dag = serdag.dag
+        if not dag:
+            return None
+        with self._lock:
             self._dags[serdag.dag_version_id] = dag
+            cache_size = len(self._dags)
+        if self._use_cache:
+            Stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
         return dag
 
-    def _get_dag(self, version_id: UUID, session: Session) -> SerializedDAG | 
None:
-        if dag := self._dags.get(version_id):
+    def _get_dag(self, version_id: UUID | str, session: Session) -> 
SerializedDAG | None:
+        # Check cache first
+        with self._lock:
+            dag = self._dags.get(version_id)
+
+        if dag:
+            if self._use_cache:
+                Stats.incr("api_server.dag_bag.cache_hit")
             return dag
+
         dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
         if not dag_version:
             return None
         if not (serdag := dag_version.serialized_dag):
             return None
+
+        # Double-checked locking: another thread may have cached it while we 
queried DB.
+        # Only emit the miss metric after confirming no other thread cached 
it, to avoid
+        # counting a single lookup as both a miss and a hit.
+        if self._use_cache:
+            with self._lock:
+                if dag := self._dags.get(version_id):
+                    Stats.incr("api_server.dag_bag.cache_hit")
+                    return dag
+            Stats.incr("api_server.dag_bag.cache_miss")
         return self._read_dag(serdag)
 
+    def get_dag(self, version_id: UUID | str, session: Session) -> 
SerializedDAG | None:
+        """Get a dag by its version id, using cache if enabled."""
+        return self._get_dag(version_id=version_id, session=session)
+
+    def get_serialized_dag_model(self, version_id: UUID | str, session: 
Session) -> SerializedDagModel | None:
+        """
+        Return the SerializedDagModel for a given dag version id.
+
+        Always queries the database. The triggerer needs the full model
+        for ``serialized_dag_model.data``, which cannot be stored in the
+        LRU/TTL cache (it stores deserialized SerializedDAG objects).
+        """
+        dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
+        if not dag_version or not (serdag := dag_version.serialized_dag):
+            return None
+        serdag.load_op_links = self.load_op_links
+        return serdag
+
+    def clear_cache(self) -> int:
+        """
+        Clear all cached DAGs and serialized DAG models.
+
+        :return: Number of entries cleared from the DAG cache.
+        """
+        with self._lock:
+            count = len(self._dags)
+            self._dags.clear()
+
+        if self._use_cache:
+            Stats.incr("api_server.dag_bag.cache_clear")
+            Stats.gauge("api_server.dag_bag.cache_size", 0)
+        return count
+
     @staticmethod
     def _version_from_dag_run(dag_run: DagRun, *, session: Session) -> UUID | 
None:
         if not dag_run.bundle_version:
@@ -78,11 +170,17 @@ class DBDagBag:
         return None
 
     def iter_all_latest_version_dags(self, *, session: Session) -> 
Generator[SerializedDAG, None, None]:
-        """Walk through all latest version dags available in the database."""
+        """
+        Walk through all latest version dags available in the database.
+
+        Note: This method does NOT cache the DAGs to avoid cache thrashing when
+        iterating over many DAGs. Each DAG is deserialized fresh from the 
database.
+        """
         from airflow.models.serialized_dag import SerializedDagModel
 
         for sdm in session.scalars(select(SerializedDagModel)):
-            if dag := self._read_dag(sdm):
+            sdm.load_op_links = self.load_op_links
+            if dag := sdm.dag:
                 yield dag
 
     def get_latest_version_of_dag(self, dag_id: str, *, session: Session) -> 
SerializedDAG | None:
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py 
b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
index 27f34064e5f..cff9576beea 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from unittest import mock
 
 import pytest
+from cachetools import LRUCache, TTLCache
 
 from airflow.api_fastapi.app import purge_cached_app
 from airflow.sdk import BaseOperator
@@ -82,3 +83,30 @@ class TestDagBagSingleton:
         assert resp2.status_code == 200
 
         assert self.dagbag_call_counter["count"] == 1
+
+
+class TestCreateDagBag:
+    """Tests for create_dag_bag() function."""
+
+    @pytest.mark.parametrize(
+        ("cache_size", "cache_ttl", "expected_use_cache", 
"expected_dags_type"),
+        [
+            pytest.param(64, 3600, True, TTLCache, id="default_ttl_cache"),
+            pytest.param(0, 3600, False, dict, id="size_zero_unbounded"),
+            pytest.param(64, 0, True, LRUCache, id="ttl_zero_lru_only"),
+        ],
+    )
+    @mock.patch("airflow.api_fastapi.common.dagbag.conf")
+    def test_create_dag_bag_cache_modes(
+        self, mock_conf, cache_size, cache_ttl, expected_use_cache, 
expected_dags_type
+    ):
+        from airflow.api_fastapi.common.dagbag import create_dag_bag
+
+        mock_conf.getint.side_effect = lambda section, key, fallback: {
+            "dag_cache_size": cache_size,
+            "dag_cache_ttl": cache_ttl,
+        }.get(key, fallback)
+
+        dag_bag = create_dag_bag()
+        assert dag_bag._use_cache is expected_use_cache
+        assert isinstance(dag_bag._dags, expected_dags_type)
diff --git a/airflow-core/tests/unit/models/test_dagbag.py 
b/airflow-core/tests/unit/models/test_dagbag.py
index 3b5b9887726..654c0593c52 100644
--- a/airflow-core/tests/unit/models/test_dagbag.py
+++ b/airflow-core/tests/unit/models/test_dagbag.py
@@ -16,7 +16,17 @@
 # under the License.
 from __future__ import annotations
 
+import time
+from concurrent.futures import ThreadPoolExecutor
+from unittest.mock import MagicMock, patch
+
 import pytest
+import time_machine
+from cachetools import LRUCache, TTLCache
+
+from airflow.models.dagbag import DBDagBag
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.serialization.definitions.dag import SerializedDAG
 
 pytestmark = pytest.mark.db_test
 
@@ -25,4 +35,271 @@ pytestmark = pytest.mark.db_test
 # the source code reorganization where DagBag moved from models to 
dag_processing.
 #
 # Tests for models-specific functionality (DBDagBag, 
DagPriorityParsingRequest, etc.)
-# would remain in this file, but currently no such tests exist.
+# remain in this file.
+
+
+class TestDBDagBag:
+    def setup_method(self):
+        self.db_dag_bag = DBDagBag()
+        self.session = MagicMock()
+
+    def test__read_dag_stores_and_returns_dag(self):
+        """It should store the SerializedDAG in _dags and return it."""
+        mock_dag = MagicMock(spec=SerializedDAG)
+        mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = mock_dag
+        mock_serdag.dag_version_id = "v1"
+
+        result = self.db_dag_bag._read_dag(mock_serdag)
+
+        assert result == mock_dag
+        assert self.db_dag_bag._dags["v1"] == mock_dag
+        assert mock_serdag.load_op_links is True
+
+    def test__read_dag_returns_none_when_no_dag(self):
+        """It should return None and not modify _dags when no DAG is 
present."""
+        mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = None
+        mock_serdag.dag_version_id = "v1"
+
+        result = self.db_dag_bag._read_dag(mock_serdag)
+
+        assert result is None
+        assert "v1" not in self.db_dag_bag._dags
+
+    def test_get_dag_fetches_from_db_on_miss(self):
+        """It should query the DB and cache the result when not in cache."""
+        mock_dag = MagicMock(spec=SerializedDAG)
+        mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = mock_dag
+        mock_serdag.dag_version_id = "v1"
+        mock_dag_version = MagicMock()
+        mock_dag_version.serialized_dag = mock_serdag
+        self.session.get.return_value = mock_dag_version
+
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+        self.session.get.assert_called_once()
+        assert result == mock_dag
+
+    def test_get_dag_returns_cached_on_hit(self):
+        """It should return cached DAG without querying DB."""
+        mock_dag = MagicMock(spec=SerializedDAG)
+        self.db_dag_bag._dags["v1"] = mock_dag
+
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+        assert result == mock_dag
+        self.session.get.assert_not_called()
+
+    def test_get_dag_returns_none_when_not_found(self):
+        """It should return None if version_id not found in DB."""
+        self.session.get.return_value = None
+
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+        assert result is None
+
+
+class TestDBDagBagCache:
+    """Tests for DBDagBag optional caching behavior."""
+
+    def test_no_caching_by_default(self):
+        """Test that DBDagBag uses a simple dict without caching by default."""
+        dag_bag = DBDagBag()
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_lru_cache_enabled_with_cache_size(self):
+        """Test that LRU cache is enabled when cache_size is provided."""
+        dag_bag = DBDagBag(cache_size=10)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, LRUCache)
+
+    def test_ttl_cache_enabled_with_cache_size_and_ttl(self):
+        """Test that TTL cache is enabled when both cache_size and cache_ttl 
are provided."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, TTLCache)
+
+    def test_zero_cache_size_uses_unbounded_dict(self):
+        """Test that cache_size=0 uses unbounded dict (same as no caching)."""
+        dag_bag = DBDagBag(cache_size=0, cache_ttl=60)
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_clear_cache_with_caching(self):
+        """Test clear_cache() with caching enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        dag_bag._dags["version_2"] = mock_dag
+        assert len(dag_bag._dags) == 2
+
+        count = dag_bag.clear_cache()
+        assert count == 2
+        assert len(dag_bag._dags) == 0
+
+    def test_clear_cache_without_caching(self):
+        """Test clear_cache() without caching enabled."""
+        dag_bag = DBDagBag()
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        assert len(dag_bag._dags) == 1
+
+        count = dag_bag.clear_cache()
+        assert count == 1
+        assert len(dag_bag._dags) == 0
+
+    def test_ttl_cache_expiry(self):
+        """Test that cached DAGs expire after TTL."""
+        # TTLCache defaults to time.monotonic which time_machine cannot 
control.
+        # Use time.time as the timer so time_machine can advance it.
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=1)
+        dag_bag._dags = TTLCache(maxsize=10, ttl=1, timer=time.time)
+
+        with time_machine.travel("2025-01-01 00:00:00", tick=False):
+            dag_bag._dags["test_version_id"] = MagicMock()
+            assert "test_version_id" in dag_bag._dags
+
+        # Jump ahead beyond TTL
+        with time_machine.travel("2025-01-01 00:00:02", tick=False):
+            assert dag_bag._dags.get("test_version_id") is None
+
+    def test_lru_eviction(self):
+        """Test that LRU eviction works when cache is full."""
+        dag_bag = DBDagBag(cache_size=2)
+
+        dag_bag._dags["version_1"] = MagicMock()
+        dag_bag._dags["version_2"] = MagicMock()
+        dag_bag._dags["version_3"] = MagicMock()
+
+        # version_1 should be evicted (LRU)
+        assert dag_bag._dags.get("version_1") is None
+        assert dag_bag._dags.get("version_2") is not None
+        assert dag_bag._dags.get("version_3") is not None
+
+    def test_thread_safety_with_caching(self):
+        """Test concurrent access doesn't cause race conditions with caching 
enabled."""
+        dag_bag = DBDagBag(cache_size=100, cache_ttl=60)
+        errors = []
+        mock_session = MagicMock()
+
+        def make_dag_version(version_id):
+            serdag = MagicMock()
+            serdag.dag = MagicMock()
+            serdag.dag_version_id = version_id
+            return MagicMock(serialized_dag=serdag)
+
+        def get_dag_version(model, version_id, options=None):
+            return make_dag_version(version_id)
+
+        mock_session.get.side_effect = get_dag_version
+
+        def access_cache(i):
+            try:
+                dag_bag._get_dag(f"version_{i % 5}", mock_session)
+            except Exception as e:
+                errors.append(e)
+
+        with ThreadPoolExecutor(max_workers=10) as executor:
+            futures = [executor.submit(access_cache, i) for i in range(100)]
+            for f in futures:
+                f.result()
+
+        assert not errors
+
+    def test_read_dag_stores_in_bounded_cache(self):
+        """Test that _read_dag stores DAG in bounded cache when cache_size > 
0."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_sdm = MagicMock()
+        mock_sdm.dag = MagicMock()
+        mock_sdm.dag_version_id = "test_version"
+
+        result = dag_bag._read_dag(mock_sdm)
+
+        assert result == mock_sdm.dag
+        assert "test_version" in dag_bag._dags
+
+    def test_read_dag_stores_in_unbounded_dict(self):
+        """Test that _read_dag stores DAG in unbounded dict when no 
cache_size."""
+        dag_bag = DBDagBag()
+
+        mock_sdm = MagicMock()
+        mock_sdm.dag = MagicMock()
+        mock_sdm.dag_version_id = "test_version"
+
+        result = dag_bag._read_dag(mock_sdm)
+
+        assert result == mock_sdm.dag
+        assert "test_version" in dag_bag._dags
+
+    def test_iter_all_latest_version_dags_does_not_cache(self):
+        """Test that iter_all_latest_version_dags does not cache to prevent 
thrashing."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_session = MagicMock()
+        mock_sdm = MagicMock()
+        mock_sdm.dag = MagicMock()
+        mock_sdm.dag_version_id = "test_version"
+        mock_session.scalars.return_value = [mock_sdm]
+
+        list(dag_bag.iter_all_latest_version_dags(session=mock_session))
+
+        # Cache should be empty -- iter doesn't cache to prevent thrashing
+        assert len(dag_bag._dags) == 0
+
+    @patch("airflow.models.dagbag.Stats")
+    def test_cache_hit_metric_emitted(self, mock_stats):
+        """Test that cache hit metric is emitted when caching is enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        mock_session = MagicMock()
+        dag_bag._dags["test_version"] = MagicMock()
+
+        dag_bag._get_dag("test_version", mock_session)
+
+        mock_stats.incr.assert_called_with("api_server.dag_bag.cache_hit")
+
+    @patch("airflow.models.dagbag.Stats")
+    def test_cache_miss_metric_emitted(self, mock_stats):
+        """Test that cache miss metric is emitted when DAG is found in DB but 
not in cache."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        mock_session = MagicMock()
+
+        # Set up a DB result so _get_dag reaches the miss metric path
+        mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = MagicMock(spec=SerializedDAG)
+        mock_serdag.dag_version_id = "uncached_version"
+        mock_dag_version = MagicMock()
+        mock_dag_version.serialized_dag = mock_serdag
+        mock_session.get.return_value = mock_dag_version
+
+        dag_bag._get_dag("uncached_version", mock_session)
+
+        mock_stats.incr.assert_any_call("api_server.dag_bag.cache_miss")
+
+    @patch("airflow.models.dagbag.Stats")
+    def test_cache_clear_metric_emitted(self, mock_stats):
+        """Test that cache clear metric is emitted when caching is enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        dag_bag._dags["test_version"] = MagicMock()
+
+        dag_bag.clear_cache()
+
+        mock_stats.incr.assert_called_with("api_server.dag_bag.cache_clear")
+
+    @patch("airflow.models.dagbag.Stats")
+    def test_cache_size_gauge_emitted(self, mock_stats):
+        """Test that cache size gauge is emitted when a DAG is cached."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        mock_serdag = MagicMock()
+        mock_serdag.dag_version_id = "test_version_1"
+        mock_serdag.dag = MagicMock()
+        mock_serdag.load_op_links = True
+
+        dag_bag._read_dag(mock_serdag)
+
+        mock_stats.gauge.assert_called_with("api_server.dag_bag.cache_size", 
1, rate=0.1)
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index 0c99dfe5ddb..17356bdcbb0 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -292,6 +292,24 @@ metrics:
     legacy_name: "-"
     name_variables: []
 
+  - name: "api_server.dag_bag.cache_hit"
+    description: "Number of cache hits when retrieving SerializedDAG from 
DBDagBag in the API server"
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "api_server.dag_bag.cache_miss"
+    description: "Number of cache misses when retrieving SerializedDAG from 
DBDagBag in the API server"
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "api_server.dag_bag.cache_clear"
+    description: "Number of times the DBDagBag cache was cleared in the API 
server"
+    type: "counter"
+    legacy_name: "-"
+    name_variables: []
+
   # ==========
   # Gauges
   # ==========
@@ -301,6 +319,12 @@ metrics:
     legacy_name: "-"
     name_variables: []
 
+  - name: "api_server.dag_bag.cache_size"
+    description: "Current number of SerializedDAG objects cached in the API 
server's DBDagBag"
+    type: "gauge"
+    legacy_name: "-"
+    name_variables: []
+
   - name: "dag_processing.import_errors"
     description: "Number of errors from trying to parse Dag files"
     type: "gauge"
diff --git a/uv.lock b/uv.lock
index 6ce960c99ac..63eacbbc5d9 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1767,6 +1767,7 @@ dependencies = [
     { name = "argcomplete" },
     { name = "asgiref" },
     { name = "attrs" },
+    { name = "cachetools" },
     { name = "cadwyn" },
     { name = "colorlog" },
     { name = "cron-descriptor" },
@@ -1893,6 +1894,7 @@ requires-dist = [
     { name = "asgiref", marker = "python_full_version < '3.14'", specifier = 
">=2.3.0" },
     { name = "asgiref", marker = "python_full_version >= '3.14'", specifier = 
">=3.11.1" },
     { name = "attrs", specifier = ">=22.1.0,!=25.2.0" },
+    { name = "cachetools", specifier = ">=6.0.0" },
     { name = "cadwyn", specifier = ">=6.0.4" },
     { name = "colorlog", specifier = ">=6.8.2" },
     { name = "cron-descriptor", specifier = ">=1.2.24" },


Reply via email to