Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-11 Thread via GitHub


jscheffl commented on PR #64326:
URL: https://github.com/apache/airflow/pull/64326#issuecomment-4229148983

   Thanks @dheerajturaga for the efforts anyway! Looking forward for a fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-10 Thread via GitHub


kaxil closed pull request #64326: Fix API server memory leak: bound DBDagBag 
version cache with LRU eviction
URL: https://github.com/apache/airflow/pull/64326


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-10 Thread via GitHub


kaxil commented on PR #64326:
URL: https://github.com/apache/airflow/pull/64326#issuecomment-4227149496

Closing this in favor of implementing it in 
https://github.com/apache/airflow/pull/60804


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-10 Thread via GitHub


dheerajturaga commented on PR #64326:
URL: https://github.com/apache/airflow/pull/64326#issuecomment-4226438990

   > Compared to Kaxil version, I believe this implementation isn't thread 
safe, which is a problem. Can we fill that gap?
   > 
   > Otherwise looking good to me.
   
   Should be thread safe now. let me know if any other objection


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-10 Thread via GitHub


Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3066476691


##
shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml:
##
@@ -292,6 +292,24 @@ metrics:
 legacy_name: "-"
 name_variables: []
 
+  - name: "dag_bag.cache.hits"
+description: "Number of DBDagBag cache hits (DAG version found in memory)"
+type: "counter"
+legacy_name: "-"
+name_variables: []
+
+  - name: "dag_bag.cache.misses"
+description: "Number of DBDagBag cache misses (DAG version fetched from 
DB)"

Review Comment:
   The implementation increments `dag_bag.cache.misses` before the DB lookup, 
including cases where the DB does not contain the requested `DagVersion` / 
`SerializedDagModel`. To match behavior, update the description to reflect “not 
found in memory (DB lookup performed)” rather than “fetched from DB”.
   ```suggestion
   description: "Number of DBDagBag cache misses (DAG version not found in 
memory; DB lookup performed)"
   ```



##
airflow-core/src/airflow/models/dagbag.py:
##
@@ -44,14 +48,31 @@ class DBDagBag:
 :meta private:
 """
 
-def __init__(self, load_op_links: bool = True) -> None:
-self._dags: dict[UUID, SerializedDagModel] = {}  # dag_version_id to 
dag
+def __init__(self, load_op_links: bool = True, max_cache_size: int | None 
= None) -> None:
+self._max_dag_version_cache_size = max_cache_size
+self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
 self.load_op_links = load_op_links
+# Use a real lock only when bounded caching is enabled (API server 
mode).
+# The scheduler uses max_cache_size=None (unbounded, single-threaded) 
and
+# gets a no-op context manager to avoid any locking overhead.
+self._lock: threading.Lock | contextlib.nullcontext[None] = (

Review Comment:
   `threading.Lock` is a factory function at runtime (returning a 
`_thread.lock`), so using `threading.Lock` as a type in a union is likely to 
fail strict type-checking. Prefer annotating `_lock` as a context manager type 
(e.g., `contextlib.AbstractContextManager[None]` / 
`collections.abc.ContextManager[None]`) and assign either `threading.Lock()` or 
`contextlib.nullcontext()` to it.
   ```suggestion
   self._lock: contextlib.AbstractContextManager[None] = (
   ```



##
airflow-core/src/airflow/models/dagbag.py:
##
@@ -44,14 +48,31 @@ class DBDagBag:
 :meta private:
 """
 
-def __init__(self, load_op_links: bool = True) -> None:
-self._dags: dict[UUID, SerializedDagModel] = {}  # dag_version_id to 
dag
+def __init__(self, load_op_links: bool = True, max_cache_size: int | None 
= None) -> None:
+self._max_dag_version_cache_size = max_cache_size
+self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
 self.load_op_links = load_op_links
+# Use a real lock only when bounded caching is enabled (API server 
mode).
+# The scheduler uses max_cache_size=None (unbounded, single-threaded) 
and
+# gets a no-op context manager to avoid any locking overhead.
+self._lock: threading.Lock | contextlib.nullcontext[None] = (
+threading.Lock() if max_cache_size is not None else 
contextlib.nullcontext()
+)
 
 def _read_dag(self, serialized_dag_model: SerializedDagModel) -> 
SerializedDAG | None:
 serialized_dag_model.load_op_links = self.load_op_links
 if dag := serialized_dag_model.dag:
-self._dags[serialized_dag_model.dag_version_id] = 
serialized_dag_model
+version_id = serialized_dag_model.dag_version_id
+with self._lock:
+self._dags[version_id] = serialized_dag_model
+self._dags.move_to_end(version_id)
+if (
+self._max_dag_version_cache_size is not None
+and len(self._dags) > self._max_dag_version_cache_size
+):
+self._dags.popitem(last=False)
+Stats.incr("dag_bag.cache.evictions")
+Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)

Review Comment:
   `Stats.incr`/`Stats.gauge` are executed while holding the cache lock, 
increasing contention risk if metric emission performs non-trivial work (or 
blocks internally). Capture the eviction flag and current size under the lock, 
then emit metrics after releasing the lock to keep the critical section minimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-09 Thread via GitHub


dheerajturaga commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3061012692


##
airflow-core/src/airflow/models/dagbag.py:
##
@@ -44,14 +46,24 @@ class DBDagBag:
 :meta private:
 """
 
-def __init__(self, load_op_links: bool = True) -> None:
-self._dags: dict[UUID, SerializedDagModel] = {}  # dag_version_id to 
dag
+def __init__(self, load_op_links: bool = True, max_cache_size: int | None 
= None) -> None:
+self._max_dag_version_cache_size = max_cache_size  # None = unbounded
+self._dags: OrderedDict[UUID, SerializedDagModel] = OrderedDict()
 self.load_op_links = load_op_links
 
 def _read_dag(self, serialized_dag_model: SerializedDagModel) -> 
SerializedDAG | None:
 serialized_dag_model.load_op_links = self.load_op_links
 if dag := serialized_dag_model.dag:
-self._dags[serialized_dag_model.dag_version_id] = 
serialized_dag_model
+version_id = serialized_dag_model.dag_version_id
+self._dags[version_id] = serialized_dag_model
+self._dags.move_to_end(version_id)
+if (
+self._max_dag_version_cache_size is not None
+and len(self._dags) > self._max_dag_version_cache_size
+):
+self._dags.popitem(last=False)
+Stats.incr("dag_bag.cache.evictions")
+Stats.gauge("dag_bag.cache.size", len(self._dags), rate=0.1)

Review Comment:
   done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-07 Thread via GitHub


Copilot commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3045612891


##
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self, 
session, dag_maker, test_c
 assert resp2.status_code == 200
 
 assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+"""Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+def _make_bag(self, max_size: int) -> DBDagBag:
+return DBDagBag(max_cache_size=max_size)
+
+def _make_model(self, version_id):
+m = mock.MagicMock()
+m.dag_version_id = version_id
+m.dag = mock.MagicMock()  # truthy — deserialization succeeds
+return m
+
+def test_cache_bounded_by_max_size(self):
+"""Inserting beyond max_size evicts the least-recently-used entry."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(4)]
+for uid in ids:
+bag._read_dag(self._make_model(uid))
+
+assert len(bag._dags) == 3
+assert ids[0] not in bag._dags  # first inserted → LRU → evicted
+assert ids[3] in bag._dags
+
+def test_cache_hit_promotes_to_mru(self):
+"""A cache hit via get_serialized_dag_model promotes the entry to 
MRU."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(3)]
+models = {uid: self._make_model(uid) for uid in ids}
+for uid in ids:
+bag._read_dag(models[uid])
+
+# Re-access ids[0] through get_serialized_dag_model to promote it
+session = mock.MagicMock()
+bag.get_serialized_dag_model(ids[0], session=session)
+session.get.assert_not_called()  # should be a pure cache hit
+
+# Insert a 4th entry — ids[1] (now LRU) should be evicted, not ids[0]
+bag._read_dag(self._make_model(uuid4()))
+
+assert ids[0] in bag._dags  # promoted to MRU, survives
+assert ids[1] not in bag._dags  # was LRU after ids[0] promoted, 
evicted
+
+def test_failed_deserialization_not_cached(self):
+"""Entries whose .dag property is falsy are not inserted into the 
cache."""
+bag = self._make_bag(max_size=10)
+m = mock.MagicMock()
+m.dag_version_id = uuid4()
+m.dag = None  # deserialization failure
+
+bag._read_dag(m)

Review Comment:
   `test_failed_deserialization_not_cached` uses a bare `mock.MagicMock()` for 
the serialized DAG model. For this specific test you only need a couple of 
attributes; using a small stub object or 
`Mock(spec_set=["dag_version_id","dag","load_op_links"])` makes the test 
stricter and less likely to mask future API changes.



##
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self, 
session, dag_maker, test_c
 assert resp2.status_code == 200
 
 assert self.dagbag_call_counter["count"] == 1
+
+
+class TestDBDagBagLRUCache:
+"""Tests for the bounded LRU eviction behaviour of DBDagBag._dags."""
+
+def _make_bag(self, max_size: int) -> DBDagBag:
+return DBDagBag(max_cache_size=max_size)
+
+def _make_model(self, version_id):
+m = mock.MagicMock()
+m.dag_version_id = version_id
+m.dag = mock.MagicMock()  # truthy — deserialization succeeds
+return m
+
+def test_cache_bounded_by_max_size(self):
+"""Inserting beyond max_size evicts the least-recently-used entry."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(4)]
+for uid in ids:
+bag._read_dag(self._make_model(uid))
+
+assert len(bag._dags) == 3
+assert ids[0] not in bag._dags  # first inserted → LRU → evicted
+assert ids[3] in bag._dags
+
+def test_cache_hit_promotes_to_mru(self):
+"""A cache hit via get_serialized_dag_model promotes the entry to 
MRU."""
+bag = self._make_bag(max_size=3)
+ids = [uuid4() for _ in range(3)]
+models = {uid: self._make_model(uid) for uid in ids}
+for uid in ids:
+bag._read_dag(models[uid])
+
+# Re-access ids[0] through get_serialized_dag_model to promote it
+session = mock.MagicMock()
+bag.get_serialized_dag_model(ids[0], session=session)
+session.get.assert_not_called()  # should be a pure cache hit

Review Comment:
   New tests use `session = mock.MagicMock()` without `spec`/`autospec`. Using 
an autospecced `Session` (or at least a `Mock(spec_set=["get"])`) helps ensure 
the test asserts against a realistic interface and prevents typos/incorrect 
method names from passing silently.



##
airflow-core/tests/unit/api_fastapi/common/test_dagbag.py:
##
@@ -82,3 +84,76 @@ def test_dagbag_used_as_singleton_in_dependency(self, 
session, dag_maker, test_c
 a

Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-07 Thread via GitHub


pierrejeambrun commented on PR #64326:
URL: https://github.com/apache/airflow/pull/64326#issuecomment-4199585820

   cc: @kaxil 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-04 Thread via GitHub


shivaam commented on PR #64326:
URL: https://github.com/apache/airflow/pull/64326#issuecomment-4188202013

   FYI @kaxil — this overlaps with your #60804, which tackles the same 
DBDagBag._dags memory growth with a similar LRU approach. Wanted to make sure 
you both were aware of each other's PRs.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-04-03 Thread via GitHub


jscheffl commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3034292445


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -1826,6 +1826,17 @@ api:
   type: string
   example: "/etc/airflow/certs/client.key"
   default: ~
+dag_version_cache_size:
+  description: |
+Maximum number of DAG versions to keep in the API server's in-memory 
LRU cache.
+The API server may serve historical run views so it benefits from 
retaining older
+versions. When the limit is reached the least-recently-used version is 
evicted.
+The scheduler does not use this setting — its cache is naturally 
bounded by the
+number of active DAGs.
+  version_added: 3.2.0

Review Comment:
   Need to be 3.2.1 as we are a bit too late for 3.2.0
   ```suggestion
 version_added: 3.2.1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-03-30 Thread via GitHub


dheerajturaga commented on PR #64326:
URL: https://github.com/apache/airflow/pull/64326#issuecomment-4158705889

   > Nice. Seems like a real production bug. A few thoughts:
   > 
   > 1. Default of 512 may be too low. The scheduler processes all active DAGs 
every cycle. With 1000+ DAGs, a 512 cache means constant eviction and 
re-fetching from the DB on every loop. The API server's Execution API also 
serves worker requests for every task state transition, so it can accumulate 
entries fast too. Consider starting higher (2048+) and letting people tune down 
— it's easier to reduce a known number than to discover you need to increase 
one you didn't know existed.
   > 2. A single config for both scheduler and API server may not be ideal. The 
scheduler's working set is bounded (latest version per active DAG) and 
performance-sensitive — it needs a cache big enough to hold all active DAGs. 
There are no metrics for the cache which will also cause problems in debugging
   
   Done! scheduler is now not bound by the cache. Its only the API server that 
can have the cache size configurable. Also added metrics to track.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-03-27 Thread via GitHub


shivaam commented on PR #64326:
URL: https://github.com/apache/airflow/pull/64326#issuecomment-4146318780

   Nice. Seems like a real production bug. A few thoughts:
   1. Default of 512 may be too low. The scheduler processes all active DAGs 
every cycle. With 1000+ DAGs, a 512 cache means constant eviction and 
re-fetching from the DB on every loop. The API server's Execution API also 
serves worker requests for every task state transition, so it can accumulate 
entries fast too. Consider starting higher (2048+) and letting people tune down 
— it's easier to reduce a known number than to discover you need to increase 
one you didn't know existed.
   2. A single config for both scheduler and API server may not be ideal. The 
scheduler's working set is bounded (latest version per active DAG) and 
performance-sensitive — it needs a cache big enough to hold all active DAGs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-03-27 Thread via GitHub


dheerajturaga commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3002703485


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -517,6 +517,17 @@ core:
   type: boolean
   example: ~
   default: "False"
+max_dag_version_cache_size:
+  description: |
+Maximum number of DAG versions to keep in the in-memory cache used by 
DBDagBag.
+When the limit is reached the least-recently-used version is evicted. 
Increase this
+value if you have many concurrently active DAG versions and can afford 
the memory;
+decrease it to reduce the memory footprint of long-running API server 
or scheduler
+processes.
+  version_added: 3.1.0

Review Comment:
   @vatsrahul1001, This is critical enough for 3.2.0 IMO. Im, having to restart 
my servers every week to stay in bounds of memory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Fix API server memory leak: bound DBDagBag version cache with LRU eviction [airflow]

2026-03-27 Thread via GitHub


eladkal commented on code in PR #64326:
URL: https://github.com/apache/airflow/pull/64326#discussion_r3002645327


##
airflow-core/src/airflow/config_templates/config.yml:
##
@@ -517,6 +517,17 @@ core:
   type: boolean
   example: ~
   default: "False"
+max_dag_version_cache_size:
+  description: |
+Maximum number of DAG versions to keep in the in-memory cache used by 
DBDagBag.
+When the limit is reached the least-recently-used version is evicted. 
Increase this
+value if you have many concurrently active DAG versions and can afford 
the memory;
+decrease it to reduce the memory footprint of long-running API server 
or scheduler
+processes.
+  version_added: 3.1.0

Review Comment:
   ```suggestion
 version_added: 3.2.0
   ```
   
   If @vatsrahul1001 allows it otherwise 3.2.1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]