[
https://issues.apache.org/jira/browse/SPARK-57183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Iván Morales updated SPARK-57183:
---------------------------------
Description:
In unbounded memory mode (the default, {{{}boundedMemoryUsage = false{}}}),
Spark's RocksDB state store wrapper creates a new {{LRUCache}} per instance in
{{RocksDBMemoryManager }}but never calls {{lruCache.close()}} in
{{{}RocksDB.close(){}}}. The Java {{LRUCache}} wrapper holds a C++
{{{}shared_ptr<Cache>{}}}, preventing native memory from being freed until the
JVM GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM running
streaming tests), GC rarely runs, so unclosed caches accumulate. Over a CI run
with ~49 RocksDB-heavy test suites this accumulates 4-8 GB of native memory and
causes OOM kills.
*Affected file*
{{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
*Related:* SPARK-56523 (Statistics native memory leak – already fixed). This is
a separate, unfixed issue.
h2. Root cause
In {{{}RocksDBMemoryManager.scala{}}}, unbounded mode creates a new cache per
instance:
{code:scala}
} else {
(null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024)) // new instance
per RocksDB
}
{code}
But {{RocksDB.close()}} never closes it:
{code:scala}
def close(): Unit = {
closeDB()
readOptions.close()
writeOptions.close()
flushOptions.close()
nativeStats.close() // added by SPARK-56523 fix
rocksDbOptions.close()
dbLogger.close()
// lruCache.close() <-- MISSING
}
{code}
In bounded mode ({{{}boundedMemoryUsage = true{}}}) the cache is a shared
singleton in {{RocksDBMemoryManager}} and must not be closed per instance. In
unbounded mode (the default) each instance owns its cache and must close it.
h2. Fix
Add the following in {{{}RocksDB.close(){}}}, after {{{}dbLogger.close(){}}}:
{code:scala}
if (!conf.boundedMemoryUsage && lruCache != null) {
lruCache.close()
}
{code}
h2. Evidence
Standalone reproducer tool: [https://github.com/kete1987/rocksdb-leak-tool]
Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
||Mode||Total growth||Per-iter (post-warmup)||Pattern||
|leak|+578 MB|~8.5 MB|Linear|
|fixed|+128 MB|~0 KB|Flat|
The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default),
confirming {{LRUCache}} as the sole leak source.
was:
In unbounded memory mode (the default, {{boundedMemoryUsage = false}}), Spark's
RocksDB
state store wrapper creates a new {{LRUCache}} per instance in
{{RocksDBMemoryManager}}
but never calls {{lruCache.close()}} in {{RocksDB.close()}}. The Java
{{LRUCache}} wrapper
holds a C++ {{shared_ptr<Cache>}}, preventing native memory from being freed
until the JVM
GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM running
streaming
tests), GC rarely runs, so unclosed caches accumulate. Over a CI run with ~49
RocksDB-heavy test suites this accumulates 4-8 GB of native memory and causes
OOM kills.
*Affected file*
{{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
*Related:* SPARK-56523 (Statistics native memory leak -- already fixed). This
is a
separate, unfixed issue.
h2. Root cause
In {{RocksDBMemoryManager.scala}}, unbounded mode creates a new cache per
instance:
{code:scala}
} else {
(null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024)) // new instance
per RocksDB
}
{code}
But {{RocksDB.close()}} never closes it:
{code:scala}
def close(): Unit = {
closeDB()
readOptions.close()
writeOptions.close()
flushOptions.close()
nativeStats.close() // added by SPARK-56523 fix
rocksDbOptions.close()
dbLogger.close()
// lruCache.close() <-- MISSING
}
{code}
In bounded mode ({{boundedMemoryUsage = true}}) the cache is a shared singleton
in
{{RocksDBMemoryManager}} and must not be closed per instance. In unbounded mode
(the
default) each instance owns its cache and must close it.
h2. Fix
Add the following in {{RocksDB.close()}}, after {{dbLogger.close()}}:
{code:scala}
if (!conf.boundedMemoryUsage && lruCache != null) {
lruCache.close()
}
{code}
h2. Evidence
Standalone reproducer tool: https://github.com/kete1987/rocksdb-leak-tool
Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
|| Mode || Total growth || Per-iter (post-warmup) || Pattern ||
| leak | +578 MB | ~8.5 MB | Linear |
| fixed | +128 MB | ~0 KB | Flat |
The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default),
confirming {{LRUCache}} as the sole leak source.
> RocksDB state store leaks native memory when LRUCache is not closed in
> unbounded mode
> -------------------------------------------------------------------------------------
>
> Key: SPARK-57183
> URL: https://issues.apache.org/jira/browse/SPARK-57183
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Iván Morales
> Priority: Major
> Labels: pull-request-available
>
> In unbounded memory mode (the default, {{{}boundedMemoryUsage = false{}}}),
> Spark's RocksDB state store wrapper creates a new {{LRUCache}} per instance
> in {{RocksDBMemoryManager }}but never calls {{lruCache.close()}} in
> {{{}RocksDB.close(){}}}. The Java {{LRUCache}} wrapper holds a C++
> {{{}shared_ptr<Cache>{}}}, preventing native memory from being freed until
> the JVM GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM
> running streaming tests), GC rarely runs, so unclosed caches accumulate. Over
> a CI run with ~49 RocksDB-heavy test suites this accumulates 4-8 GB of native
> memory and causes OOM kills.
> *Affected file*
> {{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
> *Related:* SPARK-56523 (Statistics native memory leak – already fixed). This
> is a separate, unfixed issue.
> h2. Root cause
> In {{{}RocksDBMemoryManager.scala{}}}, unbounded mode creates a new cache per
> instance:
> {code:scala}
> } else {
> (null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024)) // new instance
> per RocksDB
> }
> {code}
> But {{RocksDB.close()}} never closes it:
> {code:scala}
> def close(): Unit = {
> closeDB()
> readOptions.close()
> writeOptions.close()
> flushOptions.close()
> nativeStats.close() // added by SPARK-56523 fix
> rocksDbOptions.close()
> dbLogger.close()
> // lruCache.close() <-- MISSING
> }
> {code}
> In bounded mode ({{{}boundedMemoryUsage = true{}}}) the cache is a shared
> singleton in {{RocksDBMemoryManager}} and must not be closed per instance. In
> unbounded mode (the default) each instance owns its cache and must close it.
> h2. Fix
> Add the following in {{{}RocksDB.close(){}}}, after {{{}dbLogger.close(){}}}:
> {code:scala}
> if (!conf.boundedMemoryUsage && lruCache != null) {
> lruCache.close()
> }
> {code}
> h2. Evidence
> Standalone reproducer tool: [https://github.com/kete1987/rocksdb-leak-tool]
> Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
> ||Mode||Total growth||Per-iter (post-warmup)||Pattern||
> |leak|+578 MB|~8.5 MB|Linear|
> |fixed|+128 MB|~0 KB|Flat|
> The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default),
> confirming {{LRUCache}} as the sole leak source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]