This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new ef33b9c50806 [SPARK-46796][SS] Ensure the correct remote files 
(mentioned in metadata.zip) are used on RocksDB version load
ef33b9c50806 is described below

commit ef33b9c50806475f287267c05278aeda3645abac
Author: Bhuwan Sahni <bhuwan.sa...@databricks.com>
AuthorDate: Wed Jan 24 21:35:33 2024 +0900

    [SPARK-46796][SS] Ensure the correct remote files (mentioned in 
metadata.zip) are used on RocksDB version load
    
    This PR ensures that RocksDB loads do not run into SST file Version ID 
mismatch issue. RocksDB has added validation to ensure exact same SST file is 
used during database load from snapshot. Current streaming state suffers from 
certain edge cases where this condition is violated resulting in state load 
failure.
    
    The changes introduced are:
    
    1. Ensure that the local SST file is exactly the same DFS file (as per 
mapping in metadata.zip). We keep track of the DFS file path for a local SST 
file, and re download the SST file in case DFS file has a different UUID in 
metadata zip.
    2. Reset lastSnapshotVersion in RocksDB when Rocks DB is loaded. Changelog 
checkpoint relies on this version for future snapshots. Currently, if a older 
version is reloaded we were not uploading snapshots as lastSnapshotVersion was 
pointing to a higher snapshot of a cleanup database.
    
    We need to ensure that the correct SST files are used on executor during 
RocksDB load as per mapping in metadata.zip. With current implementation, its 
possible that the executor uses a SST file (with a different UUID) from a older 
version which is not the exact file mapped in the metadata.zip. This can cause 
version Id mismatch errors while loading RocksDB leading to streaming query 
failures.
    
    See https://issues.apache.org/jira/browse/SPARK-46796 for failure scenarios.
    
    No
    
    Added exhaustive unit testcases covering the scenarios.
    
    No
    
    Closes #44837 from sahnib/SPARK-46796.
    
    Authored-by: Bhuwan Sahni <bhuwan.sa...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit f25ebe52b9b84ece9b3c5ae30b83eaaef52ec55b)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    |   3 +
 .../streaming/state/RocksDBFileManager.scala       |  92 ++++--
 .../execution/streaming/state/RocksDBSuite.scala   | 314 ++++++++++++++++++++-
 3 files changed, 372 insertions(+), 37 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 2398b7780726..0c9738a6b081 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -151,6 +151,8 @@ class RocksDB(
         val metadata = 
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
         loadedVersion = latestSnapshotVersion
 
+        // reset last snapshot version
+        lastSnapshotVersion = 0L
         openDB()
 
         numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
@@ -191,6 +193,7 @@ class RocksDB(
    */
   private def replayChangelog(endVersion: Long): Unit = {
     for (v <- loadedVersion + 1 to endVersion) {
+      logInfo(s"replaying changelog from version $loadedVersion -> 
$endVersion")
       var changelogReader: StateStoreChangelogReader = null
       try {
         changelogReader = fileManager.getChangelogReader(v)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index faf9cd701aec..300a3b8137b4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -132,6 +132,15 @@ class RocksDBFileManager(
   import RocksDBImmutableFile._
 
   private val versionToRocksDBFiles = new ConcurrentHashMap[Long, 
Seq[RocksDBImmutableFile]]
+
+
+  // used to keep a mapping of the exact Dfs file that was used to create a 
local SST file.
+  // The reason this is a separate map because versionToRocksDBFiles can 
contain multiple similar
+  // SST files to a particular local file (for example 1.sst can map to 
1-UUID1.sst in v1 and
+  // 1-UUID2.sst in v2). We need to capture the exact file used to ensure 
Version ID compatibility
+  // across SST files and RocksDB manifest.
+  private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, 
RocksDBImmutableFile]
+
   private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), 
hadoopConf)
   private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
   private val onlyZipFiles = new PathFilter {
@@ -213,6 +222,7 @@ class RocksDBFileManager(
     versionToRocksDBFiles.keySet().removeIf(_ >= version)
     val metadata = if (version == 0) {
       if (localDir.exists) Utils.deleteRecursively(localDir)
+      localFilesToDfsFiles.clear()
       localDir.mkdirs()
       RocksDBCheckpointMetadata(Seq.empty, 0)
     } else {
@@ -449,44 +459,54 @@ class RocksDBFileManager(
     // Get the immutable files used in previous versions, as some of those 
uploaded files can be
     // reused for this version
     logInfo(s"Saving RocksDB files to DFS for $version")
-    val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ < 
version)
-      .values.flatten.map { f =>
-      f.localFileName -> f
-    }.toMap
 
     var bytesCopied = 0L
     var filesCopied = 0L
     var filesReused = 0L
 
     val immutableFiles = localFiles.map { localFile =>
-      prevFilesToSizes
-        .get(localFile.getName)
-        .filter(_.isSameFile(localFile))
-        .map { reusable =>
-          filesReused += 1
-          reusable
-        }.getOrElse {
-          val localFileName = localFile.getName
-          val dfsFileName = newDFSFileName(localFileName)
-          val dfsFile = dfsFilePath(dfsFileName)
-          // Note: The implementation of copyFromLocalFile() closes the output 
stream when there is
-          // any exception while copying. So this may generate partial files 
on DFS. But that is
-          // okay because until the main [version].zip file is written, those 
partial files are
-          // not going to be used at all. Eventually these files should get 
cleared.
-          fs.copyFromLocalFile(
-            new Path(localFile.getAbsoluteFile.toURI), dfsFile)
-          val localFileSize = localFile.length()
-          logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
-          filesCopied += 1
-          bytesCopied += localFileSize
-
-          RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
-        }
+      val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
+      if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == 
localFile.length()) {
+        val dfsFile = existingDfsFile.get
+        filesReused += 1
+        logInfo(s"reusing file $dfsFile for $localFile")
+        RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, 
dfsFile.sizeBytes)
+      } else {
+        val localFileName = localFile.getName
+        val dfsFileName = newDFSFileName(localFileName)
+        val dfsFile = dfsFilePath(dfsFileName)
+        // Note: The implementation of copyFromLocalFile() closes the output 
stream when there is
+        // any exception while copying. So this may generate partial files on 
DFS. But that is
+        // okay because until the main [version].zip file is written, those 
partial files are
+        // not going to be used at all. Eventually these files should get 
cleared.
+        fs.copyFromLocalFile(
+          new Path(localFile.getAbsoluteFile.toURI), dfsFile)
+        val localFileSize = localFile.length()
+        logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
+        filesCopied += 1
+        bytesCopied += localFileSize
+
+        val immutableDfsFile = RocksDBImmutableFile(localFile.getName, 
dfsFileName, localFileSize)
+        localFilesToDfsFiles.put(localFileName, immutableDfsFile)
+
+        immutableDfsFile
+      }
     }
     logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
       s" DFS for version $version. $filesReused files reused without copying.")
     versionToRocksDBFiles.put(version, immutableFiles)
 
+    // clean up deleted SST files from the localFilesToDfsFiles Map
+    val currentLocalFiles = localFiles.map(_.getName).toSet
+    val mappingsToClean = localFilesToDfsFiles.asScala
+      .keys
+      .filterNot(currentLocalFiles.contains)
+
+    mappingsToClean.foreach { f =>
+      logInfo(s"cleaning $f from the localFilesToDfsFiles map")
+      localFilesToDfsFiles.remove(f)
+    }
+
     saveCheckpointMetrics = RocksDBFileManagerMetrics(
       bytesCopied = bytesCopied,
       filesCopied = filesCopied,
@@ -506,11 +526,22 @@ class RocksDBFileManager(
     // Delete unnecessary local immutable files
     listRocksDBFiles(localDir)._1
       .foreach { existingFile =>
-        val isSameFile =
-          
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
+        val requiredFile = 
requiredFileNameToFileDetails.get(existingFile.getName)
+        val prevDfsFile = 
localFilesToDfsFiles.asScala.get(existingFile.getName)
+        val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
+          requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
+            existingFile.length() == requiredFile.get.sizeBytes
+        } else {
+          false
+        }
+
         if (!isSameFile) {
           existingFile.delete()
-          logInfo(s"Deleted local file $existingFile")
+          localFilesToDfsFiles.remove(existingFile.getName)
+          logInfo(s"Deleted local file $existingFile with size 
${existingFile.length()} mapped" +
+            s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
+        } else {
+          logInfo(s"reusing $prevDfsFile present at $existingFile for 
$requiredFile")
         }
       }
 
@@ -536,6 +567,7 @@ class RocksDBFileManager(
         }
         filesCopied += 1
         bytesCopied += localFileSize
+        localFilesToDfsFiles.put(localFileName, file)
         logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
       } else {
         filesReused += 1
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 91dd85822071..04b11dfe43f0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -24,16 +24,36 @@ import scala.language.implicitConversions
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.scalactic.source.Position
 import org.scalatest.Tag
 
 import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, 
FileSystemBasedCheckpointFileManager}
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream,
 RenameBasedFSDataOutputStream}
 import org.apache.spark.sql.internal.SQLConf
+import 
org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.tags.SlowSQLTest
 import org.apache.spark.util.{ThreadUtils, Utils}
 
+class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)
+  extends FileSystemBasedCheckpointFileManager(path, hadoopConf) {
+
+  override def createAtomic(path: Path,
+                            overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream = {
+    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
+  }
+
+  override def renameTempFile(srcPath: Path, dstPath: Path,
+                              overwriteIfPossible: Boolean): Unit = {
+    if (!fs.exists(dstPath)) {
+      // only write if a file does not exist at this location
+      super.renameTempFile(srcPath, dstPath, overwriteIfPossible)
+    }
+  }
+}
+
 trait RocksDBStateStoreChangelogCheckpointingTestUtil {
   val rocksdbChangelogCheckpointingConfKey: String = 
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
     ".changelogCheckpointing.enabled"
@@ -666,19 +686,19 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
       // Save SAME version again with different checkpoint files and load back 
again to verify
       // whether files were overwritten.
       val cpFiles1_ = Seq(
-        "sst-file1.sst" -> 10, // same SST file as before, but same version, 
so should get copied
+        "sst-file1.sst" -> 10, // same SST file as before, this should get 
reused
         "sst-file2.sst" -> 25, // new SST file with same name as before, but 
different length
         "sst-file3.sst" -> 30, // new SST file
         "other-file1" -> 100, // same non-SST file as before, should not get 
copied
         "other-file2" -> 210, // new non-SST file with same name as before, 
but different length
         "other-file3" -> 300, // new non-SST file
-        "archive/00001.log" -> 1000, // same log file as before and version, 
so should get copied
+        "archive/00001.log" -> 1000, // same log file as before, this should 
get reused
         "archive/00002.log" -> 2500, // new log file with same name as before, 
but different length
         "archive/00003.log" -> 3000 // new log file
       )
       saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
-      assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 
old + 3 new SST files
-      assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 
old + 3 new log files
+      assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 
old + 2 new SST files
+      assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 
old + 2 new log files
       loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, 
cpFiles1_, 1001)
 
       // Save another version and verify
@@ -688,8 +708,8 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         "archive/00004.log" -> 4000
       )
       saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
-      assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files
-      assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files
+      assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
+      assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
       loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, 
cpFiles2, 1501)
 
       // Loading an older version should work
@@ -1152,6 +1172,286 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  test("time travel - validate successful RocksDB load") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 1) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // upload snapshot 2.zip
+      db.doMaintenance()
+      for (version <- Seq(2)) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // upload snapshot 3.zip
+      db.doMaintenance()
+      // simulate db in another executor that override the zip file
+      withDB(remoteDir, conf = conf) { db1 =>
+        for (version <- 0 to 1) {
+          db1.load(version)
+          db1.put(version.toString, version.toString)
+          db1.commit()
+        }
+        db1.doMaintenance()
+      }
+      db.load(2)
+      for (version <- Seq(2)) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // upload snapshot 3.zip
+      db.doMaintenance()
+      // rollback to version 2
+      db.load(2)
+    }
+  }
+
+  test("time travel 2 - validate successful RocksDB load") {
+    Seq(1, 2).map(minDeltasForSnapshot => {
+      val remoteDir = Utils.createTempDir().toString
+      val conf = dbConf.copy(minDeltasForSnapshot = minDeltasForSnapshot,
+        compactOnCommit = false)
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
+      withDB(remoteDir, conf = conf) { db =>
+        for (version <- 0 to 1) {
+          db.load(version)
+          db.put(version.toString, version.toString)
+          db.commit()
+        }
+        // upload snapshot 2.zip
+        db.doMaintenance()
+        for (version <- 2 to 3) {
+          db.load(version)
+          db.put(version.toString, version.toString)
+          db.commit()
+        }
+        db.load(0)
+        // simulate db in another executor that override the zip file
+        withDB(remoteDir, conf = conf) { db1 =>
+          for (version <- 0 to 1) {
+            db1.load(version)
+            db1.put(version.toString, version.toString)
+            db1.commit()
+          }
+          db1.doMaintenance()
+        }
+        for (version <- 2 to 3) {
+          db.load(version)
+          db.put(version.toString, version.toString)
+          db.commit()
+        }
+        // upload snapshot 4.zip
+        db.doMaintenance()
+      }
+      withDB(remoteDir, version = 4, conf = conf) { db =>
+      }
+    })
+  }
+
+  test("time travel 3 - validate successful RocksDB load") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 0, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 2) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // upload snapshot 2.zip
+      db.doMaintenance()
+      for (version <- 1 to 3) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+      // upload snapshot 4.zip
+      db.doMaintenance()
+    }
+
+    withDB(remoteDir, version = 4, conf = conf) { db =>
+    }
+  }
+
+  test("validate Rocks DB SST files do not have a VersionIdMismatch" +
+    " when metadata file is not overwritten - scenario 1") {
+    val fmClass = "org.apache.spark.sql.execution.streaming.state." +
+      "NoOverwriteFileSystemBasedCheckpointFileManager"
+    withTempDir { dir =>
+      val dbConf = RocksDBConf(StateStoreConf(new SQLConf()))
+      val hadoopConf = new Configuration()
+      hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fmClass)
+
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 =>
+        withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 =>
+          // commit version 1 via db1
+          db1.load(0)
+          db1.put("a", "1")
+          db1.put("b", "1")
+
+          db1.commit()
+
+          // commit version 1 via db2
+          db2.load(0)
+          db2.put("a", "1")
+          db2.put("b", "1")
+
+          db2.commit()
+
+          // commit version 2 via db2
+          db2.load(1)
+          db2.put("a", "2")
+          db2.put("b", "2")
+
+          db2.commit()
+
+          // reload version 1, this should succeed
+          db2.load(1)
+          db1.load(1)
+
+          // reload version 2, this should succeed
+          db2.load(2)
+          db1.load(2)
+        }
+      }
+    }
+  }
+
+  test("validate Rocks DB SST files do not have a VersionIdMismatch" +
+    " when metadata file is overwritten - scenario 1") {
+    withTempDir { dir =>
+      val dbConf = RocksDBConf(StateStoreConf(new SQLConf()))
+      val hadoopConf = new Configuration()
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 =>
+        withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 =>
+          // commit version 1 via db1
+          db1.load(0)
+          db1.put("a", "1")
+          db1.put("b", "1")
+
+          db1.commit()
+
+          // commit version 1 via db2
+          db2.load(0)
+          db2.put("a", "1")
+          db2.put("b", "1")
+
+          db2.commit()
+
+          // commit version 2 via db2
+          db2.load(1)
+          db2.put("a", "2")
+          db2.put("b", "2")
+
+          db2.commit()
+
+          // reload version 1, this should succeed
+          db2.load(1)
+          db1.load(1)
+
+          // reload version 2, this should succeed
+          db2.load(2)
+          db1.load(2)
+        }
+      }
+    }
+  }
+
+  test("validate Rocks DB SST files do not have a VersionIdMismatch" +
+    " when metadata file is not overwritten - scenario 2") {
+    val fmClass = "org.apache.spark.sql.execution.streaming.state." +
+      "NoOverwriteFileSystemBasedCheckpointFileManager"
+    withTempDir { dir =>
+      val dbConf = RocksDBConf(StateStoreConf(new SQLConf()))
+      val hadoopConf = new Configuration()
+      hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, 
fmClass)
+
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 =>
+        withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 =>
+          // commit version 1 via db2
+          db2.load(0)
+          db2.put("a", "1")
+          db2.put("b", "1")
+
+          db2.commit()
+
+          // commit version 1 via db1
+          db1.load(0)
+          db1.put("a", "1")
+          db1.put("b", "1")
+
+          db1.commit()
+
+          // commit version 2 via db2
+          db2.load(1)
+          db2.put("a", "2")
+          db2.put("b", "2")
+
+          db2.commit()
+
+          // reload version 1, this should succeed
+          db2.load(1)
+          db1.load(1)
+
+          // reload version 2, this should succeed
+          db2.load(2)
+          db1.load(2)
+        }
+      }
+    }
+  }
+
+  test("validate Rocks DB SST files do not have a VersionIdMismatch" +
+    " when metadata file is overwritten - scenario 2") {
+    withTempDir { dir =>
+      val dbConf = RocksDBConf(StateStoreConf(new SQLConf()))
+      val hadoopConf = new Configuration()
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db1 =>
+        withDB(remoteDir, conf = dbConf, hadoopConf = hadoopConf) { db2 =>
+          // commit version 1 via db2
+          db2.load(0)
+          db2.put("a", "1")
+          db2.put("b", "1")
+
+          db2.commit()
+
+          // commit version 1 via db1
+          db1.load(0)
+          db1.put("a", "1")
+          db1.put("b", "1")
+
+          db1.commit()
+
+          // commit version 2 via db2
+          db2.load(1)
+          db2.put("a", "2")
+          db2.put("b", "2")
+
+          db2.commit()
+
+          // reload version 1, this should succeed
+          db2.load(1)
+          db1.load(1)
+
+          // reload version 2, this should succeed
+          db2.load(2)
+          db1.load(2)
+        }
+      }
+    }
+  }
+
   private def sqlConf = SQLConf.get.clone()
 
   private def dbConf = RocksDBConf(StateStoreConf(sqlConf))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to