viirya commented on a change in pull request #32928:
URL: https://github.com/apache/spark/pull/32928#discussion_r655929725



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util

Review comment:
       `sql.util`? why not in `org.apache.spark.sql.execution.streaming`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A wrapper for RocksDB library loading using an uninterruptible thread, as 
the native RocksDB
+ * code will throw an error when interrupted.
+ */
+object RocksDBLoader extends Logging {
+  /**
+   * Keep tracks of the exception thrown from the loading thread, if any.
+   */
+  private var exception: Option[Throwable] = null
+
+  private val loadLibraryThread = new UninterruptibleThread("RocksDB") {

Review comment:
       RocksDB -> RocksDBLoader

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -153,6 +156,49 @@ class RocksDBFileManager(
     logInfo(s"Saved checkpoint file for version $version")
   }
 
+  /**
+   * Load all necessary files for specific checkpoint version from DFS to 
given local directory.
+   * If version is 0, then it will deleted all files in the directory. For 
other versions, it
+   * ensures that only the exact files generated during checkpointing will be 
present in the
+   * local directory.
+   */
+  def loadCheckpointFromDfs(version: Long, localDir: File): 
RocksDBCheckpointMetadata = {

Review comment:
       Is this basically the same as #32767, right?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()
+        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        openDB()
+        numUncommittedKeys = metadata.numKeys
+        numCommittedKeys = metadata.numKeys
+        loadedVersion = version
+      }
+      writeBatch.clear()
+      logInfo(s"Loaded $version")
+    } catch {
+      case t: Throwable =>
+        loadedVersion = -1  // invalidate loaded data
+        throw t
+    }
+    this
+  }
+
+  /**
+   * Get the value for the given key if present, or null.
+   * @note This will return the last written value even if it was uncommitted.
+   */
+  def get(key: Array[Byte]): Array[Byte] = {
+    writeBatch.getFromBatchAndDB(db, readOptions, key)
+  }
+
+  /**
+   * Put the given value for the given key.

Review comment:
       `and return the last written value`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()
+        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        openDB()
+        numUncommittedKeys = metadata.numKeys
+        numCommittedKeys = metadata.numKeys
+        loadedVersion = version
+      }
+      writeBatch.clear()
+      logInfo(s"Loaded $version")
+    } catch {
+      case t: Throwable =>
+        loadedVersion = -1  // invalidate loaded data
+        throw t
+    }
+    this
+  }
+
+  /**
+   * Get the value for the given key if present, or null.
+   * @note This will return the last written value even if it was uncommitted.
+   */
+  def get(key: Array[Byte]): Array[Byte] = {
+    writeBatch.getFromBatchAndDB(db, readOptions, key)
+  }
+
+  /**
+   * Put the given value for the given key.
+   * @note This update is not committed to disk until commit() is called.
+   */
+  def put(key: Array[Byte], value: Array[Byte]): Array[Byte] = {
+    val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
+    writeBatch.put(key, value)
+    if (oldValue == null) {
+      numUncommittedKeys += 1
+    }
+    oldValue
+  }
+
+  /**
+   * Remove the key if present, and return the previous value if it was 
present (null otherwise).
+   * @note This update is not committed to disk until commit() is called.
+   */
+  def remove(key: Array[Byte]): Array[Byte] = {
+    val value = writeBatch.getFromBatchAndDB(db, readOptions, key)
+    if (value != null) {
+      writeBatch.remove(key)
+      numUncommittedKeys -= 1

Review comment:
       Same here, if the value could be from committed db, how can we always 
decrease `numUncommittedKeys`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()
+        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        openDB()
+        numUncommittedKeys = metadata.numKeys
+        numCommittedKeys = metadata.numKeys
+        loadedVersion = version
+      }
+      writeBatch.clear()
+      logInfo(s"Loaded $version")
+    } catch {
+      case t: Throwable =>
+        loadedVersion = -1  // invalidate loaded data
+        throw t
+    }
+    this
+  }
+
+  /**
+   * Get the value for the given key if present, or null.
+   * @note This will return the last written value even if it was uncommitted.
+   */
+  def get(key: Array[Byte]): Array[Byte] = {
+    writeBatch.getFromBatchAndDB(db, readOptions, key)
+  }
+
+  /**
+   * Put the given value for the given key.
+   * @note This update is not committed to disk until commit() is called.
+   */
+  def put(key: Array[Byte], value: Array[Byte]): Array[Byte] = {
+    val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
+    writeBatch.put(key, value)
+    if (oldValue == null) {
+      numUncommittedKeys += 1
+    }
+    oldValue
+  }
+
+  /**
+   * Remove the key if present, and return the previous value if it was 
present (null otherwise).
+   * @note This update is not committed to disk until commit() is called.
+   */
+  def remove(key: Array[Byte]): Array[Byte] = {
+    val value = writeBatch.getFromBatchAndDB(db, readOptions, key)
+    if (value != null) {
+      writeBatch.remove(key)
+      numUncommittedKeys -= 1
+    }
+    value
+  }
+
+  /**
+   * Get an iterator of all committed and uncommitted key-value pairs.
+   */
+  def iterator(): Iterator[ByteArrayPair] = {
+    val iter = writeBatch.newIteratorWithBase(db.newIterator())
+    logInfo(s"Getting iterator from version $loadedVersion")
+    iter.seekToFirst()
+
+    // Attempt to close this iterator if there is a task failure, or a task 
interruption.
+    // This is a hack because it assumes that the RocksDB inside running in a 
task.
+    Option(TaskContext.get()).foreach { tc =>
+      tc.addTaskCompletionListener[Unit] { _ => iter.close() }
+    }
+
+    new NextIterator[ByteArrayPair] {
+      override protected def getNext(): ByteArrayPair = {
+        if (iter.isValid) {
+          byteArrayPair.set(iter.key, iter.value)
+          iter.next()
+          byteArrayPair
+        } else {
+          finished = true
+          iter.close()
+          null
+        }
+      }
+      override protected def close(): Unit = { iter.close() }
+    }
+  }
+
+  /**
+   * Commit all the updates made as a version to DFS. The steps it needs to do 
to commits are:
+   * - Write all the updates to the native RocksDB
+   * - Flush all changes to disk
+   * - Create a RocksDB checkpoint in a new local dir
+   * - Sync the checkpoint dir files to DFS
+   */
+  def commit(): Long = {
+    val newVersion = loadedVersion + 1
+    val checkpointDir = createTempDir("checkpoint")
+    try {
+      // Make sure the directory does not exist. Native RocksDB fails if the 
directory to
+      // checkpoint exists.
+      Utils.deleteRecursively(checkpointDir)
+
+      logInfo(s"Writing updates for $newVersion")
+      val writeTimeMs = timeTakenMs { db.write(writeOptions, writeBatch) }
+
+      logInfo(s"Flushing updates for $newVersion")
+      val flushTimeMs = timeTakenMs { db.flush(flushOptions) }
+
+      val compactTimeMs = if (conf.compactOnCommit) {
+        logInfo(s"Compacting")
+        timeTakenMs { db.compactRange() }
+      } else 0
+      logInfo("Pausing background work")
+
+      val pauseTimeMs = timeTakenMs {
+        db.pauseBackgroundWork() // To avoid files being changed while 
committing
+      }
+
+      logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
+      val checkpointTimeMs = timeTakenMs {
+        val cp = Checkpoint.create(db)
+        cp.createCheckpoint(checkpointDir.toString)
+      }
+
+      logInfo(s"Syncing checkpoint for $newVersion in $checkpointDir")

Review comment:
       `in $checkpointDir` -> "to DFS"?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()

Review comment:
       `openDB()` will close DB. This looks redundant.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key

Review comment:
       Can these options be configured in the future?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()
+        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        openDB()
+        numUncommittedKeys = metadata.numKeys
+        numCommittedKeys = metadata.numKeys
+        loadedVersion = version
+      }
+      writeBatch.clear()
+      logInfo(s"Loaded $version")
+    } catch {
+      case t: Throwable =>
+        loadedVersion = -1  // invalidate loaded data
+        throw t
+    }
+    this
+  }
+
+  /**
+   * Get the value for the given key if present, or null.
+   * @note This will return the last written value even if it was uncommitted.
+   */
+  def get(key: Array[Byte]): Array[Byte] = {
+    writeBatch.getFromBatchAndDB(db, readOptions, key)
+  }
+
+  /**
+   * Put the given value for the given key.
+   * @note This update is not committed to disk until commit() is called.
+   */
+  def put(key: Array[Byte], value: Array[Byte]): Array[Byte] = {
+    val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
+    writeBatch.put(key, value)
+    if (oldValue == null) {
+      numUncommittedKeys += 1
+    }
+    oldValue
+  }
+
+  /**
+   * Remove the key if present, and return the previous value if it was 
present (null otherwise).
+   * @note This update is not committed to disk until commit() is called.
+   */
+  def remove(key: Array[Byte]): Array[Byte] = {
+    val value = writeBatch.getFromBatchAndDB(db, readOptions, key)
+    if (value != null) {
+      writeBatch.remove(key)
+      numUncommittedKeys -= 1
+    }
+    value
+  }
+
+  /**
+   * Get an iterator of all committed and uncommitted key-value pairs.
+   */
+  def iterator(): Iterator[ByteArrayPair] = {
+    val iter = writeBatch.newIteratorWithBase(db.newIterator())
+    logInfo(s"Getting iterator from version $loadedVersion")
+    iter.seekToFirst()
+
+    // Attempt to close this iterator if there is a task failure, or a task 
interruption.
+    // This is a hack because it assumes that the RocksDB inside running in a 
task.
+    Option(TaskContext.get()).foreach { tc =>
+      tc.addTaskCompletionListener[Unit] { _ => iter.close() }
+    }
+
+    new NextIterator[ByteArrayPair] {
+      override protected def getNext(): ByteArrayPair = {
+        if (iter.isValid) {
+          byteArrayPair.set(iter.key, iter.value)
+          iter.next()
+          byteArrayPair
+        } else {
+          finished = true
+          iter.close()
+          null
+        }
+      }
+      override protected def close(): Unit = { iter.close() }
+    }
+  }
+
+  /**
+   * Commit all the updates made as a version to DFS. The steps it needs to do 
to commits are:
+   * - Write all the updates to the native RocksDB
+   * - Flush all changes to disk
+   * - Create a RocksDB checkpoint in a new local dir
+   * - Sync the checkpoint dir files to DFS
+   */
+  def commit(): Long = {
+    val newVersion = loadedVersion + 1
+    val checkpointDir = createTempDir("checkpoint")
+    try {
+      // Make sure the directory does not exist. Native RocksDB fails if the 
directory to
+      // checkpoint exists.
+      Utils.deleteRecursively(checkpointDir)
+
+      logInfo(s"Writing updates for $newVersion")
+      val writeTimeMs = timeTakenMs { db.write(writeOptions, writeBatch) }
+
+      logInfo(s"Flushing updates for $newVersion")
+      val flushTimeMs = timeTakenMs { db.flush(flushOptions) }
+
+      val compactTimeMs = if (conf.compactOnCommit) {
+        logInfo(s"Compacting")

Review comment:
       nit: don't need s"".

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)

Review comment:
       manager -> fileManager?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()

Review comment:
       `createTempDir` will mkdir, no?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()
+        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        openDB()
+        numUncommittedKeys = metadata.numKeys
+        numCommittedKeys = metadata.numKeys
+        loadedVersion = version
+      }
+      writeBatch.clear()
+      logInfo(s"Loaded $version")
+    } catch {
+      case t: Throwable =>
+        loadedVersion = -1  // invalidate loaded data
+        throw t
+    }
+    this
+  }
+
+  /**
+   * Get the value for the given key if present, or null.
+   * @note This will return the last written value even if it was uncommitted.
+   */
+  def get(key: Array[Byte]): Array[Byte] = {
+    writeBatch.getFromBatchAndDB(db, readOptions, key)
+  }
+
+  /**
+   * Put the given value for the given key.
+   * @note This update is not committed to disk until commit() is called.
+   */
+  def put(key: Array[Byte], value: Array[Byte]): Array[Byte] = {
+    val oldValue = writeBatch.getFromBatchAndDB(db, readOptions, key)
+    writeBatch.put(key, value)
+    if (oldValue == null) {
+      numUncommittedKeys += 1
+    }

Review comment:
       If old value is not null, couldn't it be from committed db? How do we 
know it must be uncommitted one (and skipping increasing `numUncommittedKeys`)?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()
+        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        openDB()
+        numUncommittedKeys = metadata.numKeys
+        numCommittedKeys = metadata.numKeys

Review comment:
       This looks confusing. Why committed and uncommitted keys are both 
`numKeys`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.util.Locale
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.rocksdb.{RocksDB => NativeRocksDB, _}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.util.RocksDBLoader
+import org.apache.spark.util.{NextIterator, Utils}
+
+/**
+ * Class representing a RocksDB instance that checkpoints version of data to 
DFS.
+ * After a set of updates, a new version can be committed by calling 
`commit()`.
+ * Any past version can be loaded by calling `load(version)`.
+ *
+ * @note This class is not thread-safe, so use it only from one thread.
+ * @see [[RocksDBFileManager]] to see how the files are laid out in local disk 
and DFS.
+ * @param dfsRootDir  Remote directory where checkpoints are going to be 
written
+ * @param conf         Configuration for RocksDB
+ * @param localRootDir Root directory in local disk that is used to working 
and checkpoing dirs
+ * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDB(
+    dfsRootDir: String,
+    val conf: RocksDBConf = RocksDBConf(),
+    localRootDir: File = Utils.createTempDir(),
+    hadoopConf: Configuration = new Configuration,
+    loggingId: String = "") extends Logging {
+
+  RocksDBLoader.loadLibrary()
+
+  // Java wrapper objects linking to native RocksDB objects
+  private val readOptions = new ReadOptions()  // used for gets
+  private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
+  private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
+  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+
+  private val bloomFilter = new BloomFilter()
+  private val tableFormatConfig = new BlockBasedTableConfig()
+  tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
+  tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 
1024))
+  tableFormatConfig.setFilterPolicy(bloomFilter)
+
+  private val dbOptions = new Options() // options to open the RocksDB
+  dbOptions.setCreateIfMissing(true)
+  dbOptions.setTableFormatConfig(tableFormatConfig)
+  private val dbLogger = createLogger() // for forwarding RocksDB native logs 
to log4j
+  dbOptions.setStatistics(new Statistics())
+
+  private val workingDir = createTempDir("workingDir")
+  private val fileManager = new RocksDBFileManager(
+    dfsRootDir, createTempDir("manager"), hadoopConf, loggingId = loggingId)
+  private val byteArrayPair = new ByteArrayPair()
+  private val commitLatencyMs = new mutable.HashMap[String, Long]()
+  private val acquireLock = new Object
+
+  @volatile private var db: NativeRocksDB = _
+  @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+  @volatile private var numCommittedKeys = 0L
+  @volatile private var numUncommittedKeys = 0L
+  @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
+
+  workingDir.mkdirs()
+
+  /**
+   * Load the given version of data in a native RocksDB instance.
+   * Note that this will copy all the necessary file from DFS to local disk as 
needed,
+   * and possibly restart the native RocksDB instance.
+   */
+  def load(version: Long): RocksDB = {
+    assert(version >= 0)
+    acquire()
+    logInfo(s"Loading $version")
+    try {
+      if (loadedVersion != version) {
+        closeDB()
+        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        openDB()
+        numUncommittedKeys = metadata.numKeys
+        numCommittedKeys = metadata.numKeys
+        loadedVersion = version
+      }
+      writeBatch.clear()
+      logInfo(s"Loaded $version")
+    } catch {
+      case t: Throwable =>
+        loadedVersion = -1  // invalidate loaded data
+        throw t
+    }
+    this
+  }
+
+  /**
+   * Get the value for the given key if present, or null.
+   * @note This will return the last written value even if it was uncommitted.

Review comment:
       So it first looks at uncommitted value, if not exist, then looks for 
committed value?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to