This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1d80d80a56c4 [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations 1d80d80a56c4 is described below commit 1d80d80a56c418f841e282ad753fad6671c3baae Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Tue Dec 5 15:00:08 2023 +0900 [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations ### What changes were proposed in this pull request? Require instance lock for acquiring RocksDB metrics to prevent race with background operations ### Why are the changes needed? The changes are needed to avoid races where the statefulOperator tries to set storeMetrics after the commit and the DB instance has already been closed/aborted/reloaded. We have seen a few query failures with the following stack trace due to this reason: ``` org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 3 in stage 531.0 failed 1 times, most recent failure: Lost task 3.0 in stage 531.0 (TID 1544) (ip-10-110-29-251.us-west-2.compute.internal executor driver): java.lang.NullPointerException at org.apache.spark.sql.execution.streaming.state.RocksDB.getDBProperty(RocksDB.scala:838) at org.apache.spark.sql.execution.streaming.state.RocksDB.metrics(RocksDB.scala:678) at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.metrics(RocksDBStateStoreProvider.scala:137) at org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics(statefulOperators.scala:198) at org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics$(statefulOperators.scala:197) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.setStoreMetrics(statefulOperators.scala:495) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.close(statefulOperators.scala:626) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1743) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:552) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:482) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:557) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:445) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modified existing unit tests ``` [info] Run completed in 1 minute, 31 seconds. [info] Total number of tests run: 150 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 150, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #44165 from anishshri-db/task/SPARK-46249. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/RocksDB.scala | 30 +++++- .../state/RocksDBStateStoreProvider.scala | 103 ++++++++++++--------- .../streaming/state/RocksDBStateStoreSuite.scala | 24 +++-- .../execution/streaming/state/RocksDBSuite.scala | 36 +++---- 4 files changed, 112 insertions(+), 81 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 3a42f9e2ccb5..c33a7c472842 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -137,6 +137,11 @@ class RocksDB( @volatile private var numKeysOnWritingVersion = 0L @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS + // SPARK-46249 - Keep track of recorded metrics per version which can be used for querying later + // Updates and access to recordedMetrics are protected by the DB instance lock + @GuardedBy("acquireLock") + @volatile private var recordedMetrics: Option[RocksDBMetrics] = None + @GuardedBy("acquireLock") @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _ @@ -148,6 +153,7 @@ class RocksDB( def load(version: Long, readOnly: Boolean = false): RocksDB = { assert(version >= 0) acquire() + recordedMetrics = None logInfo(s"Loading $version") try { if (loadedVersion != version) { @@ -397,7 +403,8 @@ class RocksDB( "checkpoint" -> checkpointTimeMs, "fileSync" -> fileSyncTimeMs ) - logInfo(s"Committed $newVersion, stats = ${metrics.json}") + recordedMetrics = Some(metrics) + logInfo(s"Committed $newVersion, stats = ${recordedMetrics.get.json}") loadedVersion } catch { case t: Throwable => @@ -495,7 +502,7 @@ class RocksDB( def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = (writeBufferManager, lruCache) /** Get current instantaneous statistics */ - def metrics: RocksDBMetrics = { + private def metrics: RocksDBMetrics = { import HistogramType._ val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size") val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem") @@ -549,6 +556,25 @@ class RocksDB( nativeOpsMetrics = nativeOpsMetrics.toMap) } + /** + * Function to return RocksDB metrics if the recorded metrics are available and the operator + * has reached the commit stage for this state store instance and version. If not, we return None + * @return - Return RocksDBMetrics if available and None otherwise + */ + def metricsOpt: Option[RocksDBMetrics] = { + var rocksDBMetricsOpt: Option[RocksDBMetrics] = None + try { + acquire() + rocksDBMetricsOpt = recordedMetrics + } catch { + case ex: Exception => + logInfo(s"Failed to acquire metrics with exception=$ex") + } finally { + release() + } + rocksDBMetricsOpt + } + private def acquire(): Unit = acquireLock.synchronized { val newAcquiredThreadInfo = AcquiredThreadInfo() val waitStartTime = System.currentTimeMillis diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 4254640201c5..9552e2c81bb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -110,52 +110,65 @@ private[sql] class RocksDBStateStoreProvider } override def metrics: StateStoreMetrics = { - val rocksDBMetrics = rocksDB.metrics - def commitLatencyMs(typ: String): Long = rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L) - def nativeOpsLatencyMillis(typ: String): Long = { - rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0) - } - def sumNativeOpsLatencyMillis(typ: String): Long = { - rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.sum / 1000).getOrElse(0) - } - def nativeOpsCount(typ: String): Long = { - rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.count).getOrElse(0) - } - def nativeOpsMetrics(typ: String): Long = { - rocksDBMetrics.nativeOpsMetrics.getOrElse(typ, 0) - } + val rocksDBMetricsOpt = rocksDB.metricsOpt + + if (rocksDBMetricsOpt.isDefined) { + val rocksDBMetrics = rocksDBMetricsOpt.get + + def commitLatencyMs(typ: String): Long = + rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L) + + def nativeOpsLatencyMillis(typ: String): Long = { + rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0) + } - val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long]( - CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes, - CUSTOM_METRIC_GET_TIME -> sumNativeOpsLatencyMillis("get"), - CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"), - CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"), - CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"), - CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"), - CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"), - CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"), - CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"), - CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied, - CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied, - CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused, - CUSTOM_METRIC_BLOCK_CACHE_MISS -> nativeOpsMetrics("readBlockCacheMissCount"), - CUSTOM_METRIC_BLOCK_CACHE_HITS -> nativeOpsMetrics("readBlockCacheHitCount"), - CUSTOM_METRIC_BYTES_READ -> nativeOpsMetrics("totalBytesRead"), - CUSTOM_METRIC_BYTES_WRITTEN -> nativeOpsMetrics("totalBytesWritten"), - CUSTOM_METRIC_ITERATOR_BYTES_READ -> nativeOpsMetrics("totalBytesReadThroughIterator"), - CUSTOM_METRIC_STALL_TIME -> nativeOpsLatencyMillis("writerStallDuration"), - CUSTOM_METRIC_TOTAL_COMPACT_TIME -> sumNativeOpsLatencyMillis("compaction"), - CUSTOM_METRIC_COMPACT_READ_BYTES -> nativeOpsMetrics("totalBytesReadByCompaction"), - CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction"), - CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByFlush"), - CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE -> rocksDBMetrics.pinnedBlocksMemUsage - ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes => - Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map()) - - StateStoreMetrics( - rocksDBMetrics.numUncommittedKeys, - rocksDBMetrics.totalMemUsageBytes, - stateStoreCustomMetrics) + def sumNativeOpsLatencyMillis(typ: String): Long = { + rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.sum / 1000).getOrElse(0) + } + + def nativeOpsCount(typ: String): Long = { + rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.count).getOrElse(0) + } + + def nativeOpsMetrics(typ: String): Long = { + rocksDBMetrics.nativeOpsMetrics.getOrElse(typ, 0) + } + + val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long]( + CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes, + CUSTOM_METRIC_GET_TIME -> sumNativeOpsLatencyMillis("get"), + CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"), + CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"), + CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"), + CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"), + CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"), + CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"), + CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"), + CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied, + CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied, + CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused, + CUSTOM_METRIC_BLOCK_CACHE_MISS -> nativeOpsMetrics("readBlockCacheMissCount"), + CUSTOM_METRIC_BLOCK_CACHE_HITS -> nativeOpsMetrics("readBlockCacheHitCount"), + CUSTOM_METRIC_BYTES_READ -> nativeOpsMetrics("totalBytesRead"), + CUSTOM_METRIC_BYTES_WRITTEN -> nativeOpsMetrics("totalBytesWritten"), + CUSTOM_METRIC_ITERATOR_BYTES_READ -> nativeOpsMetrics("totalBytesReadThroughIterator"), + CUSTOM_METRIC_STALL_TIME -> nativeOpsLatencyMillis("writerStallDuration"), + CUSTOM_METRIC_TOTAL_COMPACT_TIME -> sumNativeOpsLatencyMillis("compaction"), + CUSTOM_METRIC_COMPACT_READ_BYTES -> nativeOpsMetrics("totalBytesReadByCompaction"), + CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction"), + CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByFlush"), + CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE -> rocksDBMetrics.pinnedBlocksMemUsage + ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes => + Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map()) + + StateStoreMetrics( + rocksDBMetrics.numUncommittedKeys, + rocksDBMetrics.totalMemUsageBytes, + stateStoreCustomMetrics) + } else { + logInfo(s"Failed to collect metrics for store_id=$id and version=$version") + StateStoreMetrics(0, 0, Map.empty) + } } override def hasCommitted: Boolean = state == COMMITTED diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index a6e65825a5bc..3559a10444a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -131,19 +131,25 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") { tryWithProviderResource(newStoreProvider()) { provider => - val store = provider.getStore(0) - // Verify state after updating - put(store, "a", 0, 1) - assert(get(store, "a", 0) === Some(1)) - assert(store.commit() === 1) - provider.doMaintenance() - assert(store.hasCommitted) - val storeMetrics = store.metrics - assert(storeMetrics.numKeys === 1) + val store = provider.getStore(0) + // Verify state after updating + put(store, "a", 0, 1) + assert(get(store, "a", 0) === Some(1)) + assert(store.commit() === 1) + provider.doMaintenance() + assert(store.hasCommitted) + val storeMetrics = store.metrics + assert(storeMetrics.numKeys === 1) + // SPARK-46249 - In the case of changelog checkpointing, the snapshot upload happens in + // the context of the background maintenance thread. The file manager metrics are updated + // here and will be available as part of the next metrics update. So we cannot rely on the + // file manager metrics to be available here for this version. + if (!isChangelogCheckpointingEnabled) { assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) > 0L) assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_REUSED) == 0L) assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) > 0L) assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index e290f808f560..9ce2137df72c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -922,13 +922,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared withTempDir { dir => val remoteDir = dir.getCanonicalPath withDB(remoteDir) { db => - verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics) db.load(0) db.put("a", "1") // put also triggers a db get db.get("a") // this is found in-memory writebatch - no get triggered in db db.get("b") // key doesn't exists - triggers db get db.commit() - verifyMetrics(putCount = 1, getCount = 3, metrics = db.metrics) + verifyMetrics(putCount = 1, getCount = 3, metrics = db.metricsOpt.get) db.load(1) db.put("b", "2") // put also triggers a db get @@ -936,7 +935,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.get("c") // key doesn't exists - triggers db get assert(iterator(db).toSet === Set(("a", "1"), ("b", "2"))) db.commit() - verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true, db.metrics) + verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true, db.metricsOpt.get) } } @@ -944,19 +943,18 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared withTempDir { dir => val remoteDir = dir.getCanonicalPath withDB(remoteDir, conf = dbConf.copy(resetStatsOnLoad = false)) { db => - verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics) db.load(0) db.put("a", "1") // put also triggers a db get db.commit() // put and get counts are cumulative - verifyMetrics(putCount = 1, getCount = 1, metrics = db.metrics) + verifyMetrics(putCount = 1, getCount = 1, metrics = db.metricsOpt.get) db.load(1) db.put("b", "2") // put also triggers a db get db.get("a") db.commit() // put and get counts are cumulative: existing get=1, put=1: new get=2, put=1 - verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics) + verifyMetrics(putCount = 2, getCount = 3, metrics = db.metricsOpt.get) } } @@ -974,7 +972,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.put("b", "25") db.commit() - val metrics = db.metrics + val metrics = db.metricsOpt.get assert(metrics.nativeOpsHistograms("compaction").count > 0) assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") > 0) assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") > 0) @@ -1194,13 +1192,10 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.put("a", "5") db.put("b", "5") - assert(db.metrics.numUncommittedKeys === 2) - assert(db.metrics.numCommittedKeys === 0) - curVersion = db.commit() - assert(db.metrics.numUncommittedKeys === 2) - assert(db.metrics.numCommittedKeys === 2) + assert(db.metricsOpt.get.numUncommittedKeys === 2) + assert(db.metricsOpt.get.numCommittedKeys === 2) } // restart with config "trackTotalNumberOfRows = false" @@ -1208,16 +1203,13 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = false)) { db => db.load(curVersion) - assert(db.metrics.numUncommittedKeys === -1) - assert(db.metrics.numCommittedKeys === -1) - db.put("b", "7") db.put("c", "7") curVersion = db.commit() - assert(db.metrics.numUncommittedKeys === -1) - assert(db.metrics.numCommittedKeys === -1) + assert(db.metricsOpt.get.numUncommittedKeys === -1) + assert(db.metricsOpt.get.numCommittedKeys === -1) } // restart with config "trackTotalNumberOfRows = true" again @@ -1225,19 +1217,13 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = true)) { db => db.load(curVersion) - assert(db.metrics.numUncommittedKeys === 3) - assert(db.metrics.numCommittedKeys === 3) - db.put("c", "8") db.put("d", "8") - assert(db.metrics.numUncommittedKeys === 4) - assert(db.metrics.numCommittedKeys === 3) - curVersion = db.commit() - assert(db.metrics.numUncommittedKeys === 4) - assert(db.metrics.numCommittedKeys === 4) + assert(db.metricsOpt.get.numUncommittedKeys === 4) + assert(db.metricsOpt.get.numCommittedKeys === 4) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org