[ 
https://issues.apache.org/jira/browse/SPARK-57183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Iván Morales updated SPARK-57183:
---------------------------------
    Description: 
In unbounded memory mode (the default, {{{}boundedMemoryUsage = false{}}}), 
Spark's RocksDB state store wrapper creates a new {{LRUCache}} per instance in 
{{RocksDBMemoryManager }}but never calls {{lruCache.close()}} in 
{{{}RocksDB.close(){}}}. The Java {{LRUCache}} wrapper holds a C++ 
{{{}shared_ptr<Cache>{}}}, preventing native memory from being freed until the 
JVM GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM running 
streaming tests), GC rarely runs, so unclosed caches accumulate. Over a CI run 
with ~49 RocksDB-heavy test suites this accumulates 4-8 GB of native memory and 
causes OOM kills.

*Affected file*
{{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}

*Related:* SPARK-56523 (Statistics native memory leak – already fixed). This is 
a separate, unfixed issue.
h2. Root cause

In {{{}RocksDBMemoryManager.scala{}}}, unbounded mode creates a new cache per 
instance:
{code:scala}
} else {
  (null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024))  // new instance 
per RocksDB
}
{code}
But {{RocksDB.close()}} never closes it:
{code:scala}
def close(): Unit = {
  closeDB()
  readOptions.close()
  writeOptions.close()
  flushOptions.close()
  nativeStats.close()    // added by SPARK-56523 fix
  rocksDbOptions.close()
  dbLogger.close()
  // lruCache.close()  <-- MISSING
}
{code}
In bounded mode ({{{}boundedMemoryUsage = true{}}}) the cache is a shared 
singleton in {{RocksDBMemoryManager}} and must not be closed per instance. In 
unbounded mode (the default) each instance owns its cache and must close it.
h2. Fix

Add the following in {{{}RocksDB.close(){}}}, after {{{}dbLogger.close(){}}}:
{code:scala}
if (!conf.boundedMemoryUsage && lruCache != null) {
  lruCache.close()
}
{code}
h2. Evidence

Standalone reproducer tool: [https://github.com/kete1987/rocksdb-leak-tool]

Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
||Mode||Total growth||Per-iter (post-warmup)||Pattern||
|leak|+578 MB|~8.5 MB|Linear|
|fixed|+128 MB|~0 KB|Flat|

The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default), 
confirming {{LRUCache}} as the sole leak source.

  was:
In unbounded memory mode (the default, {{boundedMemoryUsage = false}}), Spark's 
RocksDB 
state store wrapper creates a new {{LRUCache}} per instance in 
{{RocksDBMemoryManager}} 
but never calls {{lruCache.close()}} in {{RocksDB.close()}}. The Java 
{{LRUCache}} wrapper 
holds a C++ {{shared_ptr<Cache>}}, preventing native memory from being freed 
until the JVM 
GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM running 
streaming 
tests), GC rarely runs, so unclosed caches accumulate. Over a CI run with ~49 
RocksDB-heavy test suites this accumulates 4-8 GB of native memory and causes 
OOM kills.

*Affected file*
{{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}

*Related:* SPARK-56523 (Statistics native memory leak -- already fixed). This 
is a 
separate, unfixed issue.

h2. Root cause

In {{RocksDBMemoryManager.scala}}, unbounded mode creates a new cache per 
instance:

{code:scala}
} else {
  (null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024))  // new instance 
per RocksDB
}
{code}

But {{RocksDB.close()}} never closes it:

{code:scala}
def close(): Unit = {
  closeDB()
  readOptions.close()
  writeOptions.close()
  flushOptions.close()
  nativeStats.close()    // added by SPARK-56523 fix
  rocksDbOptions.close()
  dbLogger.close()
  // lruCache.close()  <-- MISSING
}
{code}

In bounded mode ({{boundedMemoryUsage = true}}) the cache is a shared singleton 
in 
{{RocksDBMemoryManager}} and must not be closed per instance. In unbounded mode 
(the 
default) each instance owns its cache and must close it.

h2. Fix

Add the following in {{RocksDB.close()}}, after {{dbLogger.close()}}:

{code:scala}
if (!conf.boundedMemoryUsage && lruCache != null) {
  lruCache.close()
}
{code}

h2. Evidence

Standalone reproducer tool: https://github.com/kete1987/rocksdb-leak-tool

Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):

|| Mode || Total growth || Per-iter (post-warmup) || Pattern ||
| leak  | +578 MB | ~8.5 MB | Linear |
| fixed | +128 MB | ~0 KB   | Flat   |

The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default), 
confirming {{LRUCache}} as the sole leak source.


> RocksDB state store leaks native memory when LRUCache is not closed in 
> unbounded mode
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-57183
>                 URL: https://issues.apache.org/jira/browse/SPARK-57183
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Iván Morales
>            Priority: Major
>              Labels: pull-request-available
>
> In unbounded memory mode (the default, {{{}boundedMemoryUsage = false{}}}), 
> Spark's RocksDB state store wrapper creates a new {{LRUCache}} per instance 
> in {{RocksDBMemoryManager }}but never calls {{lruCache.close()}} in 
> {{{}RocksDB.close(){}}}. The Java {{LRUCache}} wrapper holds a C++ 
> {{{}shared_ptr<Cache>{}}}, preventing native memory from being freed until 
> the JVM GC finalizes the wrapper. Under low heap pressure (e.g., a 4 GB JVM 
> running streaming tests), GC rarely runs, so unclosed caches accumulate. Over 
> a CI run with ~49 RocksDB-heavy test suites this accumulates 4-8 GB of native 
> memory and causes OOM kills.
> *Affected file*
> {{sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala}}
> *Related:* SPARK-56523 (Statistics native memory leak – already fixed). This 
> is a separate, unfixed issue.
> h2. Root cause
> In {{{}RocksDBMemoryManager.scala{}}}, unbounded mode creates a new cache per 
> instance:
> {code:scala}
> } else {
>   (null, new LRUCache(conf.blockCacheSizeMB * 1024 * 1024))  // new instance 
> per RocksDB
> }
> {code}
> But {{RocksDB.close()}} never closes it:
> {code:scala}
> def close(): Unit = {
>   closeDB()
>   readOptions.close()
>   writeOptions.close()
>   flushOptions.close()
>   nativeStats.close()    // added by SPARK-56523 fix
>   rocksDbOptions.close()
>   dbLogger.close()
>   // lruCache.close()  <-- MISSING
> }
> {code}
> In bounded mode ({{{}boundedMemoryUsage = true{}}}) the cache is a shared 
> singleton in {{RocksDBMemoryManager}} and must not be closed per instance. In 
> unbounded mode (the default) each instance owns its cache and must close it.
> h2. Fix
> Add the following in {{{}RocksDB.close(){}}}, after {{{}dbLogger.close(){}}}:
> {code:scala}
> if (!conf.boundedMemoryUsage && lruCache != null) {
>   lruCache.close()
> }
> {code}
> h2. Evidence
> Standalone reproducer tool: [https://github.com/kete1987/rocksdb-leak-tool]
> Results (60 iterations, --no-gc, --cache-mb 8, Windows 11 / Java 11.0.28):
> ||Mode||Total growth||Per-iter (post-warmup)||Pattern||
> |leak|+578 MB|~8.5 MB|Linear|
> |fixed|+128 MB|~0 KB|Flat|
> The ~8.5 MB/iter matches exactly {{blockCacheSizeMB = 8}} (Spark default), 
> confirming {{LRUCache}} as the sole leak source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to