This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d80d80a56c4 [SPARK-46249][SS] Require instance lock for acquiring 
RocksDB metrics to prevent race with background operations
1d80d80a56c4 is described below

commit 1d80d80a56c418f841e282ad753fad6671c3baae
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Tue Dec 5 15:00:08 2023 +0900

    [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to 
prevent race with background operations
    
    ### What changes were proposed in this pull request?
    Require instance lock for acquiring RocksDB metrics to prevent race with 
background operations
    
    ### Why are the changes needed?
    The changes are needed to avoid races where the statefulOperator tries to 
set storeMetrics after the commit and the DB instance has already been 
closed/aborted/reloaded.
    We have seen a few query failures with the following stack trace due to 
this reason:
    ```
            org.apache.spark.sql.streaming.StreamingQueryException: Job aborted 
due to stage failure: Task 3 in stage 531.0 failed 1 times, most recent 
failure: Lost task 3.0 in stage 531.0 (TID 1544) 
(ip-10-110-29-251.us-west-2.compute.internal executor driver): 
java.lang.NullPointerException
            at 
org.apache.spark.sql.execution.streaming.state.RocksDB.getDBProperty(RocksDB.scala:838)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDB.metrics(RocksDB.scala:678)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.metrics(RocksDBStateStoreProvider.scala:137)
            at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics(statefulOperators.scala:198)
            at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics$(statefulOperators.scala:197)
            at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.setStoreMetrics(statefulOperators.scala:495)
            at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.close(statefulOperators.scala:626)
            at 
org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
            at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithKeys_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
            at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:498)
            at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1743)
            at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:552)
            at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:482)
            at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:557)
            at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:445)
            at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
            at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
            at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
            at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
            at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
            at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
            at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
            at 
com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
            at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Modified existing unit tests
    
    ```
    [info] Run completed in 1 minute, 31 seconds.
    [info] Total number of tests run: 150
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 150, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44165 from anishshri-db/task/SPARK-46249.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    |  30 +++++-
 .../state/RocksDBStateStoreProvider.scala          | 103 ++++++++++++---------
 .../streaming/state/RocksDBStateStoreSuite.scala   |  24 +++--
 .../execution/streaming/state/RocksDBSuite.scala   |  36 +++----
 4 files changed, 112 insertions(+), 81 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 3a42f9e2ccb5..c33a7c472842 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -137,6 +137,11 @@ class RocksDB(
   @volatile private var numKeysOnWritingVersion = 0L
   @volatile private var fileManagerMetrics = 
RocksDBFileManagerMetrics.EMPTY_METRICS
 
+  // SPARK-46249 - Keep track of recorded metrics per version which can be 
used for querying later
+  // Updates and access to recordedMetrics are protected by the DB instance 
lock
+  @GuardedBy("acquireLock")
+  @volatile private var recordedMetrics: Option[RocksDBMetrics] = None
+
   @GuardedBy("acquireLock")
   @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
 
@@ -148,6 +153,7 @@ class RocksDB(
   def load(version: Long, readOnly: Boolean = false): RocksDB = {
     assert(version >= 0)
     acquire()
+    recordedMetrics = None
     logInfo(s"Loading $version")
     try {
       if (loadedVersion != version) {
@@ -397,7 +403,8 @@ class RocksDB(
         "checkpoint" -> checkpointTimeMs,
         "fileSync" -> fileSyncTimeMs
       )
-      logInfo(s"Committed $newVersion, stats = ${metrics.json}")
+      recordedMetrics = Some(metrics)
+      logInfo(s"Committed $newVersion, stats = ${recordedMetrics.get.json}")
       loadedVersion
     } catch {
       case t: Throwable =>
@@ -495,7 +502,7 @@ class RocksDB(
   def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = 
(writeBufferManager, lruCache)
 
   /** Get current instantaneous statistics */
-  def metrics: RocksDBMetrics = {
+  private def metrics: RocksDBMetrics = {
     import HistogramType._
     val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
     val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
@@ -549,6 +556,25 @@ class RocksDB(
       nativeOpsMetrics = nativeOpsMetrics.toMap)
   }
 
+  /**
+   * Function to return RocksDB metrics if the recorded metrics are available 
and the operator
+   * has reached the commit stage for this state store instance and version. 
If not, we return None
+   * @return - Return RocksDBMetrics if available and None otherwise
+   */
+  def metricsOpt: Option[RocksDBMetrics] = {
+    var rocksDBMetricsOpt: Option[RocksDBMetrics] = None
+    try {
+      acquire()
+      rocksDBMetricsOpt = recordedMetrics
+    } catch {
+      case ex: Exception =>
+        logInfo(s"Failed to acquire metrics with exception=$ex")
+    } finally {
+      release()
+    }
+    rocksDBMetricsOpt
+  }
+
   private def acquire(): Unit = acquireLock.synchronized {
     val newAcquiredThreadInfo = AcquiredThreadInfo()
     val waitStartTime = System.currentTimeMillis
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 4254640201c5..9552e2c81bb1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -110,52 +110,65 @@ private[sql] class RocksDBStateStoreProvider
     }
 
     override def metrics: StateStoreMetrics = {
-      val rocksDBMetrics = rocksDB.metrics
-      def commitLatencyMs(typ: String): Long = 
rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L)
-      def nativeOpsLatencyMillis(typ: String): Long = {
-        rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0)
-      }
-      def sumNativeOpsLatencyMillis(typ: String): Long = {
-        rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.sum / 
1000).getOrElse(0)
-      }
-      def nativeOpsCount(typ: String): Long = {
-        rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.count).getOrElse(0)
-      }
-      def nativeOpsMetrics(typ: String): Long = {
-        rocksDBMetrics.nativeOpsMetrics.getOrElse(typ, 0)
-      }
+      val rocksDBMetricsOpt = rocksDB.metricsOpt
+
+      if (rocksDBMetricsOpt.isDefined) {
+        val rocksDBMetrics = rocksDBMetricsOpt.get
+
+        def commitLatencyMs(typ: String): Long =
+          rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L)
+
+        def nativeOpsLatencyMillis(typ: String): Long = {
+          rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0)
+        }
 
-      val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long](
-        CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes,
-        CUSTOM_METRIC_GET_TIME -> sumNativeOpsLatencyMillis("get"),
-        CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"),
-        CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"),
-        CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"),
-        CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"),
-        CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"),
-        CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"),
-        CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"),
-        CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
-        CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied,
-        CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused,
-        CUSTOM_METRIC_BLOCK_CACHE_MISS -> 
nativeOpsMetrics("readBlockCacheMissCount"),
-        CUSTOM_METRIC_BLOCK_CACHE_HITS -> 
nativeOpsMetrics("readBlockCacheHitCount"),
-        CUSTOM_METRIC_BYTES_READ -> nativeOpsMetrics("totalBytesRead"),
-        CUSTOM_METRIC_BYTES_WRITTEN -> nativeOpsMetrics("totalBytesWritten"),
-        CUSTOM_METRIC_ITERATOR_BYTES_READ -> 
nativeOpsMetrics("totalBytesReadThroughIterator"),
-        CUSTOM_METRIC_STALL_TIME -> 
nativeOpsLatencyMillis("writerStallDuration"),
-        CUSTOM_METRIC_TOTAL_COMPACT_TIME -> 
sumNativeOpsLatencyMillis("compaction"),
-        CUSTOM_METRIC_COMPACT_READ_BYTES -> 
nativeOpsMetrics("totalBytesReadByCompaction"),
-        CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> 
nativeOpsMetrics("totalBytesWrittenByCompaction"),
-        CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> 
nativeOpsMetrics("totalBytesWrittenByFlush"),
-        CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE -> 
rocksDBMetrics.pinnedBlocksMemUsage
-      ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
-        Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> 
bytes)).getOrElse(Map())
-
-      StateStoreMetrics(
-        rocksDBMetrics.numUncommittedKeys,
-        rocksDBMetrics.totalMemUsageBytes,
-        stateStoreCustomMetrics)
+        def sumNativeOpsLatencyMillis(typ: String): Long = {
+          rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.sum / 
1000).getOrElse(0)
+        }
+
+        def nativeOpsCount(typ: String): Long = {
+          rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.count).getOrElse(0)
+        }
+
+        def nativeOpsMetrics(typ: String): Long = {
+          rocksDBMetrics.nativeOpsMetrics.getOrElse(typ, 0)
+        }
+
+        val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long](
+          CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes,
+          CUSTOM_METRIC_GET_TIME -> sumNativeOpsLatencyMillis("get"),
+          CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"),
+          CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"),
+          CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"),
+          CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"),
+          CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"),
+          CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"),
+          CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"),
+          CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
+          CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied,
+          CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused,
+          CUSTOM_METRIC_BLOCK_CACHE_MISS -> 
nativeOpsMetrics("readBlockCacheMissCount"),
+          CUSTOM_METRIC_BLOCK_CACHE_HITS -> 
nativeOpsMetrics("readBlockCacheHitCount"),
+          CUSTOM_METRIC_BYTES_READ -> nativeOpsMetrics("totalBytesRead"),
+          CUSTOM_METRIC_BYTES_WRITTEN -> nativeOpsMetrics("totalBytesWritten"),
+          CUSTOM_METRIC_ITERATOR_BYTES_READ -> 
nativeOpsMetrics("totalBytesReadThroughIterator"),
+          CUSTOM_METRIC_STALL_TIME -> 
nativeOpsLatencyMillis("writerStallDuration"),
+          CUSTOM_METRIC_TOTAL_COMPACT_TIME -> 
sumNativeOpsLatencyMillis("compaction"),
+          CUSTOM_METRIC_COMPACT_READ_BYTES -> 
nativeOpsMetrics("totalBytesReadByCompaction"),
+          CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> 
nativeOpsMetrics("totalBytesWrittenByCompaction"),
+          CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> 
nativeOpsMetrics("totalBytesWrittenByFlush"),
+          CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE -> 
rocksDBMetrics.pinnedBlocksMemUsage
+        ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
+          Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> 
bytes)).getOrElse(Map())
+
+        StateStoreMetrics(
+          rocksDBMetrics.numUncommittedKeys,
+          rocksDBMetrics.totalMemUsageBytes,
+          stateStoreCustomMetrics)
+      } else {
+        logInfo(s"Failed to collect metrics for store_id=$id and 
version=$version")
+        StateStoreMetrics(0, 0, Map.empty)
+      }
     }
 
     override def hasCommitted: Boolean = state == COMMITTED
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index a6e65825a5bc..3559a10444a4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -131,19 +131,25 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
     withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
       tryWithProviderResource(newStoreProvider()) { provider =>
-          val store = provider.getStore(0)
-          // Verify state after updating
-          put(store, "a", 0, 1)
-          assert(get(store, "a", 0) === Some(1))
-          assert(store.commit() === 1)
-          provider.doMaintenance()
-          assert(store.hasCommitted)
-          val storeMetrics = store.metrics
-          assert(storeMetrics.numKeys === 1)
+        val store = provider.getStore(0)
+        // Verify state after updating
+        put(store, "a", 0, 1)
+        assert(get(store, "a", 0) === Some(1))
+        assert(store.commit() === 1)
+        provider.doMaintenance()
+        assert(store.hasCommitted)
+        val storeMetrics = store.metrics
+        assert(storeMetrics.numKeys === 1)
+        // SPARK-46249 - In the case of changelog checkpointing, the snapshot 
upload happens in
+        // the context of the background maintenance thread. The file manager 
metrics are updated
+        // here and will be available as part of the next metrics update. So 
we cannot rely on the
+        // file manager metrics to be available here for this version.
+        if (!isChangelogCheckpointingEnabled) {
           assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) > 
0L)
           assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_REUSED) == 
0L)
           assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) > 
0L)
           assert(getCustomMetric(storeMetrics, 
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L)
+        }
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index e290f808f560..9ce2137df72c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -922,13 +922,12 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     withTempDir { dir =>
       val remoteDir = dir.getCanonicalPath
       withDB(remoteDir) { db =>
-        verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
         db.load(0)
         db.put("a", "1") // put also triggers a db get
         db.get("a") // this is found in-memory writebatch - no get triggered 
in db
         db.get("b") // key doesn't exists - triggers db get
         db.commit()
-        verifyMetrics(putCount = 1, getCount = 3, metrics = db.metrics)
+        verifyMetrics(putCount = 1, getCount = 3, metrics = db.metricsOpt.get)
 
         db.load(1)
         db.put("b", "2") // put also triggers a db get
@@ -936,7 +935,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.get("c") // key doesn't exists - triggers db get
         assert(iterator(db).toSet === Set(("a", "1"), ("b", "2")))
         db.commit()
-        verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true, 
db.metrics)
+        verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true, 
db.metricsOpt.get)
       }
     }
 
@@ -944,19 +943,18 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     withTempDir { dir =>
       val remoteDir = dir.getCanonicalPath
       withDB(remoteDir, conf = dbConf.copy(resetStatsOnLoad = false)) { db =>
-        verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
         db.load(0)
         db.put("a", "1") // put also triggers a db get
         db.commit()
         // put and get counts are cumulative
-        verifyMetrics(putCount = 1, getCount = 1, metrics = db.metrics)
+        verifyMetrics(putCount = 1, getCount = 1, metrics = db.metricsOpt.get)
 
         db.load(1)
         db.put("b", "2") // put also triggers a db get
         db.get("a")
         db.commit()
         // put and get counts are cumulative: existing get=1, put=1: new 
get=2, put=1
-        verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics)
+        verifyMetrics(putCount = 2, getCount = 3, metrics = db.metricsOpt.get)
       }
     }
 
@@ -974,7 +972,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.put("b", "25")
         db.commit()
 
-        val metrics = db.metrics
+        val metrics = db.metricsOpt.get
         assert(metrics.nativeOpsHistograms("compaction").count > 0)
         assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") > 0)
         assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") > 0)
@@ -1194,13 +1192,10 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.put("a", "5")
         db.put("b", "5")
 
-        assert(db.metrics.numUncommittedKeys === 2)
-        assert(db.metrics.numCommittedKeys === 0)
-
         curVersion = db.commit()
 
-        assert(db.metrics.numUncommittedKeys === 2)
-        assert(db.metrics.numCommittedKeys === 2)
+        assert(db.metricsOpt.get.numUncommittedKeys === 2)
+        assert(db.metricsOpt.get.numCommittedKeys === 2)
       }
 
       // restart with config "trackTotalNumberOfRows = false"
@@ -1208,16 +1203,13 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
       withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = false)) { 
db =>
         db.load(curVersion)
 
-        assert(db.metrics.numUncommittedKeys === -1)
-        assert(db.metrics.numCommittedKeys === -1)
-
         db.put("b", "7")
         db.put("c", "7")
 
         curVersion = db.commit()
 
-        assert(db.metrics.numUncommittedKeys === -1)
-        assert(db.metrics.numCommittedKeys === -1)
+        assert(db.metricsOpt.get.numUncommittedKeys === -1)
+        assert(db.metricsOpt.get.numCommittedKeys === -1)
       }
 
       // restart with config "trackTotalNumberOfRows = true" again
@@ -1225,19 +1217,13 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
       withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = true)) { 
db =>
         db.load(curVersion)
 
-        assert(db.metrics.numUncommittedKeys === 3)
-        assert(db.metrics.numCommittedKeys === 3)
-
         db.put("c", "8")
         db.put("d", "8")
 
-        assert(db.metrics.numUncommittedKeys === 4)
-        assert(db.metrics.numCommittedKeys === 3)
-
         curVersion = db.commit()
 
-        assert(db.metrics.numUncommittedKeys === 4)
-        assert(db.metrics.numCommittedKeys === 4)
+        assert(db.metricsOpt.get.numUncommittedKeys === 4)
+        assert(db.metricsOpt.get.numCommittedKeys === 4)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to