viirya commented on a change in pull request #33749:
URL: https://github.com/apache/spark/pull/33749#discussion_r690673605



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1586,6 +1586,21 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  /**
+   * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places 
should be updated
+   * together.
+   */
+  val STATE_STORE_ROCKSDB_FORMAT_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion")
+      .internal()
+      .doc("Set the RocksDB format version. This will be stored in the 
checkpoint when starting " +
+        "a streaming query. If this configuration is not set, we will use the 
value in the " +

Review comment:
       This sounds more like an optional config? If this is an optional config, 
seems we don't need to have a default value here.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -497,23 +497,38 @@ case class RocksDBConf(
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
-    resetStatsOnLoad : Boolean)
+    resetStatsOnLoad : Boolean,
+    formatVersion: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
   val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
-  private case class ConfEntry(name: String, default: String) {
-    def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+  case class ConfEntry(name: String, default: String) {
+    def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}"
   }
 
   // Configuration that specifies whether to compact the RocksDB data every 
time data is committed
-  private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
+  val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
   private val PAUSE_BG_WORK_FOR_COMMIT_CONF = 
ConfEntry("pauseBackgroundWorkForCommit", "true")
   private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
   private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
-  private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", 
"60000")
+  val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
   private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+  // Configuration to set the RocksDB format version. When upgrading the 
RocksDB version in Spark,
+  // it may introduce a new table format version that can not be supported by 
an old RocksDB version
+  // used by an old Spark version. Hence, we store the table format version in 
the checkpoint when
+  // a query starts, and when restarting a query from a checkpoint, we will 
use the format version
+  // in the checkpoint. This will ensure the user can still rollback their 
Spark version for an
+  // existing query when RocksDB changes its default table format in a new 
version. The user can

Review comment:
       Once we use the format version in the checkpoint when restarting a 
query, it will just use original format on the RocksDB instead of the new 
format, why needing to rollback Spark version?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to