This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 045ec6a166c8 [SPARK-48208][SS] Skip providing memory usage metrics from RocksDB if bounded memory usage is enabled 045ec6a166c8 is described below commit 045ec6a166c8d2bdf73585fc4160c136e5f2888a Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Thu May 9 17:10:01 2024 +0900 [SPARK-48208][SS] Skip providing memory usage metrics from RocksDB if bounded memory usage is enabled ### What changes were proposed in this pull request? Skip providing memory usage metrics from RocksDB if bounded memory usage is enabled ### Why are the changes needed? Without this, we are providing memory usage that is the max usage per node at a partition level. For eg - if we report this ``` "allRemovalsTimeMs" : 93, "commitTimeMs" : 32240, "memoryUsedBytes" : 15956211724278, "numRowsDroppedByWatermark" : 0, "numShufflePartitions" : 200, "numStateStoreInstances" : 200, ``` We have 200 partitions in this case. So the memory usage per partition / state store would be ~78GB. However, this node has 256GB memory total and we have 2 such nodes. We have configured our cluster to use 30% of available memory on each node for RocksDB which is ~77GB. So the memory being reported here is actually per node rather than per partition which could be confusing for users. ### Does this PR introduce _any_ user-facing change? No - only a metrics reporting change ### How was this patch tested? Added unit tests ``` [info] Run completed in 10 seconds, 878 milliseconds. [info] Total number of tests run: 24 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 24, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #46491 from anishshri-db/task/SPARK-48208. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/execution/streaming/state/RocksDB.scala | 11 ++++++++++- .../spark/sql/execution/streaming/state/RocksDBSuite.scala | 11 +++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index caecf817c12f..151695192281 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -777,10 +777,19 @@ class RocksDB( .keys.filter(checkInternalColumnFamilies(_)).size val numExternalColFamilies = colFamilyNameToHandleMap.keys.size - numInternalColFamilies + // if bounded memory usage is enabled, we share the block cache across all state providers + // running on the same node and account the usage to this single cache. In this case, its not + // possible to provide partition level or query level memory usage. + val memoryUsage = if (conf.boundedMemoryUsage) { + 0L + } else { + readerMemUsage + memTableMemUsage + blockCacheUsage + } + RocksDBMetrics( numKeysOnLoadedVersion, numKeysOnWritingVersion, - readerMemUsage + memTableMemUsage + blockCacheUsage, + memoryUsage, pinnedBlocksMemUsage, totalSSTFilesBytes, nativeOpsLatencyMicros, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index ab2afa1b8a61..6086fd43846f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -1699,6 +1699,11 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.load(0) db.put("a", "1") db.commit() + if (boundedMemoryUsage == "true") { + assert(db.metricsOpt.get.totalMemUsageBytes === 0) + } else { + assert(db.metricsOpt.get.totalMemUsageBytes > 0) + } db.getWriteBufferManagerAndCache() } @@ -1709,6 +1714,11 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.load(0) db.put("a", "1") db.commit() + if (boundedMemoryUsage == "true") { + assert(db.metricsOpt.get.totalMemUsageBytes === 0) + } else { + assert(db.metricsOpt.get.totalMemUsageBytes > 0) + } db.getWriteBufferManagerAndCache() } @@ -1758,6 +1768,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.remove("a") db.put("c", "3") db.commit() + assert(db.metricsOpt.get.totalMemUsageBytes === 0) } } finally { RocksDBMemoryManager.resetWriteBufferManagerAndCache --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org