[
https://issues.apache.org/jira/browse/SPARK-57183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Iván Morales updated SPARK-57183:
---------------------------------
Description:
In unbounded memory mode (the default, {{boundedMemoryUsage = false}}), Spark's
RocksDB
state store wrapper creates a new {{LRUCache}} per instance in
{{RocksDBMemoryManager}}
but never calls {{lruCache.close()}} in {{RocksDB.close()}}. The Java
{{LRUCache}} wrapper
holds a C++ {{shared_ptr<Cache>}}, preventing native memory from being freed
until the JVM
GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM running
streaming
tests), GC rarely runs, so unclosed caches accumulate. Over a CI run with ~49
RocksDB-heavy test suites this accumulates 4-8 GB of native memory and causes
OOM kills.
*Affected file*
{{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
*Related:* SPARK-56523 (Statistics native memory leak -- already fixed). This
is a
separate, unfixed issue.
h2. Root cause
In {{RocksDBMemoryManager.scala}}, unbounded mode creates a new cache per
instance:
{code:scala}
} else {
(null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024)) // new instance
per RocksDB
}
{code}
But {{RocksDB.close()}} never closes it:
{code:scala}
def close(): Unit = {
closeDB()
readOptions.close()
writeOptions.close()
flushOptions.close()
nativeStats.close() // added by SPARK-56523 fix
rocksDbOptions.close()
dbLogger.close()
// lruCache.close() <-- MISSING
}
{code}
In bounded mode ({{boundedMemoryUsage = true}}) the cache is a shared singleton
in
{{RocksDBMemoryManager}} and must not be closed per instance. In unbounded mode
(the
default) each instance owns its cache and must close it.
h2. Fix
Add the following in {{RocksDB.close()}}, after {{dbLogger.close()}}:
{code:scala}
if (!conf.boundedMemoryUsage && lruCache != null) {
lruCache.close()
}
{code}
h2. Evidence
Standalone reproducer tool: https://github.com/kete1987/rocksdb-leak-tool
Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
|| Mode || Total growth || Per-iter (post-warmup) || Pattern ||
| leak | +578 MB | ~8.5 MB | Linear |
| fixed | +128 MB | ~0 KB | Flat |
The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default),
confirming {{LRUCache}} as the sole leak source.
was:
In unbounded memory mode (the default, {{boundedMemoryUsage = false}}), Spark's
RocksDB
state store wrapper creates a new {{LRUCache}} per instance in
{{RocksDBMemoryManager}}
but never calls {{lruCache.close()}} in {{RocksDB.close()}}. The Java
{{LRUCache}} wrapper
holds a C++ {{shared_ptr<Cache>}}, preventing native memory from being freed
until the JVM
GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM running
streaming
tests), GC rarely runs, so unclosed caches accumulate. Over a CI run with ~49
RocksDB-heavy test suites this accumulates 4-8 GB of native memory and causes
OOM kills.
*Affected file:*
{{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
*Related:* SPARK-56523 (Statistics native memory leak -- already fixed). This
is a
separate, unfixed issue.
h2. Root cause
In {{RocksDBMemoryManager.scala}}, unbounded mode creates a new cache per
instance:
{code:scala}
} else {
(null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024)) // new instance
per RocksDB
}
{code}
But {{RocksDB.close()}} never closes it:
{code:scala}
def close(): Unit = {
closeDB()
readOptions.close()
writeOptions.close()
flushOptions.close()
nativeStats.close() // added by SPARK-56523 fix
rocksDbOptions.close()
dbLogger.close()
// lruCache.close() <-- MISSING
}
{code}
In bounded mode ({{boundedMemoryUsage = true}}) the cache is a shared singleton
in
{{RocksDBMemoryManager}} and must not be closed per instance. In unbounded mode
(the
default) each instance owns its cache and must close it.
h2. Fix
Add the following in {{RocksDB.close()}}, after {{dbLogger.close()}}:
{code:scala}
if (!conf.boundedMemoryUsage && lruCache != null) {
lruCache.close()
}
{code}
h2. Evidence
Standalone reproducer tool: https://github.com/kete1987/rocksdb-leak-tool
Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
|| Mode || Total growth || Per-iter (post-warmup) || Pattern ||
| leak | +578 MB | ~8.5 MB | Linear |
| fixed | +128 MB | ~0 KB | Flat |
The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default),
confirming {{LRUCache}} as the sole leak source.
> RocksDB state store leaks native memory when LRUCache is not closed in
> unbounded mode
> -------------------------------------------------------------------------------------
>
> Key: SPARK-57183
> URL: https://issues.apache.org/jira/browse/SPARK-57183
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Iván Morales
> Priority: Major
>
> In unbounded memory mode (the default, {{boundedMemoryUsage = false}}),
> Spark's RocksDB
> state store wrapper creates a new {{LRUCache}} per instance in
> {{RocksDBMemoryManager}}
> but never calls {{lruCache.close()}} in {{RocksDB.close()}}. The Java
> {{LRUCache}} wrapper
> holds a C++ {{shared_ptr<Cache>}}, preventing native memory from being freed
> until the JVM
> GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM running
> streaming
> tests), GC rarely runs, so unclosed caches accumulate. Over a CI run with ~49
> RocksDB-heavy test suites this accumulates 4-8 GB of native memory and causes
> OOM kills.
> *Affected file*
> {{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
> *Related:* SPARK-56523 (Statistics native memory leak -- already fixed). This
> is a
> separate, unfixed issue.
> h2. Root cause
> In {{RocksDBMemoryManager.scala}}, unbounded mode creates a new cache per
> instance:
> {code:scala}
> } else {
> (null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024)) // new instance
> per RocksDB
> }
> {code}
> But {{RocksDB.close()}} never closes it:
> {code:scala}
> def close(): Unit = {
> closeDB()
> readOptions.close()
> writeOptions.close()
> flushOptions.close()
> nativeStats.close() // added by SPARK-56523 fix
> rocksDbOptions.close()
> dbLogger.close()
> // lruCache.close() <-- MISSING
> }
> {code}
> In bounded mode ({{boundedMemoryUsage = true}}) the cache is a shared
> singleton in
> {{RocksDBMemoryManager}} and must not be closed per instance. In unbounded
> mode (the
> default) each instance owns its cache and must close it.
> h2. Fix
> Add the following in {{RocksDB.close()}}, after {{dbLogger.close()}}:
> {code:scala}
> if (!conf.boundedMemoryUsage && lruCache != null) {
> lruCache.close()
> }
> {code}
> h2. Evidence
> Standalone reproducer tool: https://github.com/kete1987/rocksdb-leak-tool
> Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
> || Mode || Total growth || Per-iter (post-warmup) || Pattern ||
> | leak | +578 MB | ~8.5 MB | Linear |
> | fixed | +128 MB | ~0 KB | Flat |
> The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default),
> confirming {{LRUCache}} as the sole leak source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]