This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 045ec6a166c8 [SPARK-48208][SS] Skip providing memory usage metrics 
from RocksDB if bounded memory usage is enabled
045ec6a166c8 is described below

commit 045ec6a166c8d2bdf73585fc4160c136e5f2888a
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Thu May 9 17:10:01 2024 +0900

    [SPARK-48208][SS] Skip providing memory usage metrics from RocksDB if 
bounded memory usage is enabled
    
    ### What changes were proposed in this pull request?
    Skip providing memory usage metrics from RocksDB if bounded memory usage is 
enabled
    
    ### Why are the changes needed?
    Without this, we are providing memory usage that is the max usage per node 
at a partition level.
    For eg - if we report this
    ```
        "allRemovalsTimeMs" : 93,
        "commitTimeMs" : 32240,
        "memoryUsedBytes" : 15956211724278,
        "numRowsDroppedByWatermark" : 0,
        "numShufflePartitions" : 200,
        "numStateStoreInstances" : 200,
    ```
    
    We have 200 partitions in this case.
    So the memory usage per partition / state store would be ~78GB. However, 
this node has 256GB memory total and we have 2 such nodes. We have configured 
our cluster to use 30% of available memory on each node for RocksDB which is 
~77GB.
    So the memory being reported here is actually per node rather than per 
partition which could be confusing for users.
    
    ### Does this PR introduce _any_ user-facing change?
    No - only a metrics reporting change
    
    ### How was this patch tested?
    Added unit tests
    
    ```
    [info] Run completed in 10 seconds, 878 milliseconds.
    [info] Total number of tests run: 24
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 24, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46491 from anishshri-db/task/SPARK-48208.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/execution/streaming/state/RocksDB.scala  | 11 ++++++++++-
 .../spark/sql/execution/streaming/state/RocksDBSuite.scala    | 11 +++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index caecf817c12f..151695192281 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -777,10 +777,19 @@ class RocksDB(
       .keys.filter(checkInternalColumnFamilies(_)).size
     val numExternalColFamilies = colFamilyNameToHandleMap.keys.size - 
numInternalColFamilies
 
+    // if bounded memory usage is enabled, we share the block cache across all 
state providers
+    // running on the same node and account the usage to this single cache. In 
this case, its not
+    // possible to provide partition level or query level memory usage.
+    val memoryUsage = if (conf.boundedMemoryUsage) {
+      0L
+    } else {
+      readerMemUsage + memTableMemUsage + blockCacheUsage
+    }
+
     RocksDBMetrics(
       numKeysOnLoadedVersion,
       numKeysOnWritingVersion,
-      readerMemUsage + memTableMemUsage + blockCacheUsage,
+      memoryUsage,
       pinnedBlocksMemUsage,
       totalSSTFilesBytes,
       nativeOpsLatencyMicros,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index ab2afa1b8a61..6086fd43846f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -1699,6 +1699,11 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
               db.load(0)
               db.put("a", "1")
               db.commit()
+              if (boundedMemoryUsage == "true") {
+                assert(db.metricsOpt.get.totalMemUsageBytes === 0)
+              } else {
+                assert(db.metricsOpt.get.totalMemUsageBytes > 0)
+              }
               db.getWriteBufferManagerAndCache()
             }
 
@@ -1709,6 +1714,11 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
               db.load(0)
               db.put("a", "1")
               db.commit()
+              if (boundedMemoryUsage == "true") {
+                assert(db.metricsOpt.get.totalMemUsageBytes === 0)
+              } else {
+                assert(db.metricsOpt.get.totalMemUsageBytes > 0)
+              }
               db.getWriteBufferManagerAndCache()
             }
 
@@ -1758,6 +1768,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
             db.remove("a")
             db.put("c", "3")
             db.commit()
+            assert(db.metricsOpt.get.totalMemUsageBytes === 0)
           }
         } finally {
           RocksDBMemoryManager.resetWriteBufferManagerAndCache


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to