Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
jscheffl commented on PR #64326: URL: https://github.com/apache/airflow/pull/64326#issuecomment-4229148983 Thanks @dheerajturaga for the efforts anyway! Looking forward for a fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
kaxil closed pull request #64326: Fix API server memory leak: bound DBDagBag version cache with LRU eviction URL: https://github.com/apache/airflow/pull/64326 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
kaxil commented on PR #64326: URL: https://github.com/apache/airflow/pull/64326#issuecomment-4227149496 Closing this in favor of implementing it in https://github.com/apache/airflow/pull/60804 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
dheerajturaga commented on PR #64326: URL: https://github.com/apache/airflow/pull/64326#issuecomment-4226438990 > Compared to Kaxil version, I believe this implementation isn't thread safe, which is a problem. Can we fill that gap? > > Otherwise looking good to me. Should be thread safe now. let me know if any other objection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3066476691
##
shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml:
##
@@ -292,6 +292,24 @@ metrics:
legacy_name: "-"
name_variables: []
+ - name: "dag_bag.cache.hits"
+description: "Number of DBDagBag cache hits (DAG version found in memory)"
+type: "counter"
+legacy_name: "-"
+name_variables: []
+
+ - name: "dag_bag.cache.misses"
+description: "Number of DBDagBag cache misses (DAG version fetched from
DB)"
Review Comment:
The implementation increments `dag_bag.cache.misses` before the DB lookup,
including cases where the DB does not contain the requested `DagVersion` /
`SerializedDagModel`. To match behavior, update the description to reflect “not
found in memory (DB lookup performed)” rather than “fetched from DB”.
```suggestion
description: "Number of DBDagBag cache misses (DAG version not found in
memory; DB lookup performed)"
```
##
airflow-core/src/airflow/models/dagbag.py:
##
@@ -44,14 +48,31 @@ class DBDagBag:
:meta private:
"""
-def __init__(self, load_op_links: bool = True) -> None:
-self._dags: dict[UUID, SerializedDagModel] = {} # dag_version_id to
dag
+def __init__(self, load_op_links: bool = True, max_cache_size: int | None
= None) -> None:
+self._max_dag_version_cache_size = max_cache_size
+self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
self.load_op_links = load_op_links
+# Use a real lock only when bounded caching is enabled (API server
mode).
+# The scheduler uses max_cache_size=None (unbounded, single-threaded)
and
+# gets a no-op context manager to avoid any locking overhead.
+self._lock: threading.Lock | contextlib.nullcontext[None] = (
Review Comment:
`threading.Lock` is a factory function at runtime (returning a
`_thread.lock`), so using `threading.Lock` as a type in a union is likely to
fail strict type-checking. Prefer annotating `_lock` as a context manager type
(e.g., `contextlib.AbstractContextManager[None]` /
`collections.abc.ContextManager[None]`) and assign either `threading.Lock()` or
`contextlib.nullcontext()` to it.
```suggestion
self._lock: contextlib.AbstractContextManager[None] = (
```
##
airflow-core/src/airflow/models/dagbag.py:
##
@@ -44,14 +48,31 @@ class DBDagBag:
:meta private:
"""
-def __init__(self, load_op_links: bool = True) -> None:
-self._dags: dict[UUID, SerializedDagModel] = {} # dag_version_id to
dag
+def __init__(self, load_op_links: bool = True, max_cache_size: int | None
= None) -> None:
+self._max_dag_version_cache_size = max_cache_size
+self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
self.load_op_links = load_op_links
+# Use a real lock only when bounded caching is enabled (API server
mode).
+# The scheduler uses max_cache_size=None (unbounded, single-threaded)
and
+# gets a no-op context manager to avoid any locking overhead.
+self._lock: threading.Lock | contextlib.nullcontext[None] = (
+threading.Lock() if max_cache_size is not None else
contextlib.nullcontext()
+)
def _read_dag(self, serialized_dag_model: SerializedDagModel) ->
SerializedDAG | None:
serialized_dag_model.load_op_links = self.load_op_links
if dag := serialized_dag_model.dag:
-self._dags[serialized_dag_model.dag_version_id] =
serialized_dag_model
+version_id = serialized_dag_model.dag_version_id
+with self._lock:
+self._dags[version_id] = serialized_dag_model
+self._dags.move_to_end(version_id)
+if (
+self._max_dag_version_cache_size is not None
+and len(self._dags) > self._max_dag_version_cache_size
+):
+self._dags.popitem(last=False)
+Stats.incr("dag_bag.cache.evictions")
+Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)
Review Comment:
`Stats.incr`/`Stats.gauge` are executed while holding the cache lock,
increasing contention risk if metric emission performs non-trivial work (or
blocks internally). Capture the eviction flag and current size under the lock,
then emit metrics after releasing the lock to keep the critical section minimal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
dheerajturaga commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3061012692
##
airflow-core/src/airflow/models/dagbag.py:
##
@@ -44,14 +46,24 @@ class DBDagBag:
:meta private:
"""
-def __init__(self, load_op_links: bool = True) -> None:
-self._dags: dict[UUID, SerializedDagModel] = {} # dag_version_id to
dag
+def __init__(self, load_op_links: bool = True, max_cache_size: int | None
= None) -> None:
+self._max_dag_version_cache_size = max_cache_size # None = unbounded
+self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
self.load_op_links = load_op_links
def _read_dag(self, serialized_dag_model: SerializedDagModel) ->
SerializedDAG | None:
serialized_dag_model.load_op_links = self.load_op_links
if dag := serialized_dag_model.dag:
-self._dags[serialized_dag_model.dag_version_id] =
serialized_dag_model
+version_id = serialized_dag_model.dag_version_id
+self._dags[version_id] = serialized_dag_model
+self._dags.move_to_end(version_id)
+if (
+self._max_dag_version_cache_size is not None
+and len(self._dags) > self._max_dag_version_cache_size
+):
+self._dags.popitem(last=False)
+Stats.incr("dag_bag.cache.evictions")
+Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)
Review Comment:
done!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3045612891
##
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self,
session, dag_maker, test_c
assert resp2.status_code == 200
assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+"""Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+def _make_bag(self, max_size: int) -> DBDagBag:
+return DBDagBag(max_cache_size=max_size)
+
+def _make_model(self, version_id):
+m = mock.MagicMock()
+m.dag_version_id = version_id
+m.dag = mock.MagicMock() # truthy — deserialization succeeds
+return m
+
+def test_cache_bounded_by_max_size(self):
+"""Inserting beyond max_size evicts the least-recently-used entry."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(4)]
+for uid in ids:
+bag._read_dag(self._make_model(uid))
+
+assert len(bag._dags) == 3
+assert ids[0] not in bag._dags # first inserted → LRU → evicted
+assert ids[3] in bag._dags
+
+def test_cache_hit_promotes_to_mru(self):
+"""A cache hit via get_serialized_dag_model promotes the entry to
MRU."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(3)]
+models = {uid: self._make_model(uid) for uid in ids}
+for uid in ids:
+bag._read_dag(models[uid])
+
+# Re-access ids[0] through get_serialized_dag_model to promote it
+session = mock.MagicMock()
+bag.get_serialized_dag_model(ids[0], session=session)
+session.get.assert_not_called() # should be a pure cache hit
+
+# Insert a 4th entry — ids[1] (now LRU) should be evicted, not ids[0]
+bag._read_dag(self._make_model(uuid4()))
+
+assert ids[0] in bag._dags # promoted to MRU, survives
+assert ids[1] not in bag._dags # was LRU after ids[0] promoted,
evicted
+
+def test_failed_deserialization_not_cached(self):
+"""Entries whose .dag property is falsy are not inserted into the
cache."""
+bag = self._make_bag(max_size=10)
+m = mock.MagicMock()
+m.dag_version_id = uuid4()
+m.dag = None # deserialization failure
+
+bag._read_dag(m)
Review Comment:
`test_failed_deserialization_not_cached` uses a bare `mock.MagicMock()` for
the serialized DAG model. For this specific test you only need a couple of
attributes; using a small stub object or
`Mock(spec_set=["dag_version_id","dag","load_op_links"])` makes the test
stricter and less likely to mask future API changes.
##
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self,
session, dag_maker, test_c
assert resp2.status_code == 200
assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+"""Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+def _make_bag(self, max_size: int) -> DBDagBag:
+return DBDagBag(max_cache_size=max_size)
+
+def _make_model(self, version_id):
+m = mock.MagicMock()
+m.dag_version_id = version_id
+m.dag = mock.MagicMock() # truthy — deserialization succeeds
+return m
+
+def test_cache_bounded_by_max_size(self):
+"""Inserting beyond max_size evicts the least-recently-used entry."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(4)]
+for uid in ids:
+bag._read_dag(self._make_model(uid))
+
+assert len(bag._dags) == 3
+assert ids[0] not in bag._dags # first inserted → LRU → evicted
+assert ids[3] in bag._dags
+
+def test_cache_hit_promotes_to_mru(self):
+"""A cache hit via get_serialized_dag_model promotes the entry to
MRU."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(3)]
+models = {uid: self._make_model(uid) for uid in ids}
+for uid in ids:
+bag._read_dag(models[uid])
+
+# Re-access ids[0] through get_serialized_dag_model to promote it
+session = mock.MagicMock()
+bag.get_serialized_dag_model(ids[0], session=session)
+session.get.assert_not_called() # should be a pure cache hit
Review Comment:
New tests use `session = mock.MagicMock()` without `spec`/`autospec`. Using
an autospecced `Session` (or at least a `Mock(spec_set=["get"])`) helps ensure
the test asserts against a realistic interface and prevents typos/incorrect
method names from passing silently.
##
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self,
session, dag_maker, test_c
a
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
pierrejeambrun commented on PR #64326: URL: https://github.com/apache/airflow/pull/64326#issuecomment-4199585820 cc: @kaxil -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
shivaam commented on PR #64326: URL: https://github.com/apache/airflow/pull/64326#issuecomment-4188202013 FYI @kaxil — this overlaps with your #60804, which tackles the same DBDagBag._dags memory growth with a similar LRU approach. Wanted to make sure you both were aware of each other's PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
jscheffl commented on code in PR #64326: URL: https://github.com/apache/airflow/pull/64326#discussion_r3034292445 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -1826,6 +1826,17 @@ api: type: string example: "/etc/airflow/certs/client.key" default: ~ +dag_version_cache_size: + description: | +Maximum number of DAG versions to keep in the API server's in-memory LRU cache. +The API server may serve historical run views so it benefits from retaining older +versions. When the limit is reached the least-recently-used version is evicted. +The scheduler does not use this setting — its cache is naturally bounded by the +number of active DAGs. + version_added: 3.2.0 Review Comment: Need to be 3.2.1 as we are a bit too late for 3.2.0 ```suggestion version_added: 3.2.1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
dheerajturaga commented on PR #64326: URL: https://github.com/apache/airflow/pull/64326#issuecomment-4158705889 > Nice. Seems like a real production bug. A few thoughts: > > 1. Default of 512 may be too low. The scheduler processes all active DAGs every cycle. With 1000+ DAGs, a 512 cache means constant eviction and re-fetching from the DB on every loop. The API server's Execution API also serves worker requests for every task state transition, so it can accumulate entries fast too. Consider starting higher (2048+) and letting people tune down — it's easier to reduce a known number than to discover you need to increase one you didn't know existed. > 2. A single config for both scheduler and API server may not be ideal. The scheduler's working set is bounded (latest version per active DAG) and performance-sensitive — it needs a cache big enough to hold all active DAGs. There are no metrics for the cache which will also cause problems in debugging Done! scheduler is now not bound by the cache. Its only the API server that can have the cache size configurable. Also added metrics to track. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
shivaam commented on PR #64326: URL: https://github.com/apache/airflow/pull/64326#issuecomment-4146318780 Nice. Seems like a real production bug. A few thoughts: 1. Default of 512 may be too low. The scheduler processes all active DAGs every cycle. With 1000+ DAGs, a 512 cache means constant eviction and re-fetching from the DB on every loop. The API server's Execution API also serves worker requests for every task state transition, so it can accumulate entries fast too. Consider starting higher (2048+) and letting people tune down — it's easier to reduce a known number than to discover you need to increase one you didn't know existed. 2. A single config for both scheduler and API server may not be ideal. The scheduler's working set is bounded (latest version per active DAG) and performance-sensitive — it needs a cache big enough to hold all active DAGs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
dheerajturaga commented on code in PR #64326: URL: https://github.com/apache/airflow/pull/64326#discussion_r3002703485 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -517,6 +517,17 @@ core: type: boolean example: ~ default: "False" +max_dag_version_cache_size: + description: | +Maximum number of DAG versions to keep in the in-memory cache used by DBDagBag. +When the limit is reached the least-recently-used version is evicted. Increase this +value if you have many concurrently active DAG versions and can afford the memory; +decrease it to reduce the memory footprint of long-running API server or scheduler +processes. + version_added: 3.1.0 Review Comment: @vatsrahul1001, This is critical enough for 3.2.0 IMO. Im, having to restart my servers every week to stay in bounds of memory -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]
eladkal commented on code in PR #64326: URL: https://github.com/apache/airflow/pull/64326#discussion_r3002645327 ## airflow-core/src/airflow/config_templates/config.yml: ## @@ -517,6 +517,17 @@ core: type: boolean example: ~ default: "False" +max_dag_version_cache_size: + description: | +Maximum number of DAG versions to keep in the in-memory cache used by DBDagBag. +When the limit is reached the least-recently-used version is evicted. Increase this +value if you have many concurrently active DAG versions and can afford the memory; +decrease it to reduce the memory footprint of long-running API server or scheduler +processes. + version_added: 3.1.0 Review Comment: ```suggestion version_added: 3.2.0 ``` If @vatsrahul1001 allows it otherwise 3.2.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
