zsxwing commented on a change in pull request #33749:
URL: https://github.com/apache/spark/pull/33749#discussion_r690836452



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -497,23 +497,38 @@ case class RocksDBConf(
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
-    resetStatsOnLoad : Boolean)
+    resetStatsOnLoad : Boolean,
+    formatVersion: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
   val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
-  private case class ConfEntry(name: String, default: String) {
-    def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+  case class ConfEntry(name: String, default: String) {
+    def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}"
   }
 
   // Configuration that specifies whether to compact the RocksDB data every 
time data is committed
-  private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
+  val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
   private val PAUSE_BG_WORK_FOR_COMMIT_CONF = 
ConfEntry("pauseBackgroundWorkForCommit", "true")
   private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
   private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
-  private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", 
"60000")
+  val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
   private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+  // Configuration to set the RocksDB format version. When upgrading the 
RocksDB version in Spark,

Review comment:
       > Nice explanation! It would be nice if we can refer this from config in 
SQLConf which is closer to user facing - despite it's marked as internal, they 
find the config in SQLConf first instead of this.
   
   Made some improvement for the doc of 
`SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -497,23 +497,38 @@ case class RocksDBConf(
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
-    resetStatsOnLoad : Boolean)
+    resetStatsOnLoad : Boolean,
+    formatVersion: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
   val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
-  private case class ConfEntry(name: String, default: String) {
-    def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+  case class ConfEntry(name: String, default: String) {
+    def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}"
   }
 
   // Configuration that specifies whether to compact the RocksDB data every 
time data is committed
-  private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
+  val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
   private val PAUSE_BG_WORK_FOR_COMMIT_CONF = 
ConfEntry("pauseBackgroundWorkForCommit", "true")
   private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
   private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
-  private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", 
"60000")
+  val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
   private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+  // Configuration to set the RocksDB format version. When upgrading the 
RocksDB version in Spark,
+  // it may introduce a new table format version that can not be supported by 
an old RocksDB version
+  // used by an old Spark version. Hence, we store the table format version in 
the checkpoint when
+  // a query starts, and when restarting a query from a checkpoint, we will 
use the format version
+  // in the checkpoint. This will ensure the user can still rollback their 
Spark version for an
+  // existing query when RocksDB changes its default table format in a new 
version. The user can

Review comment:
       This is to encourage users to upgrade  their Spark. If we tell a user 
that if they upgrade the Spark version for an existing production query, they 
could not rollback to the old Spark version because old Spark version cannot 
read the new checkpoint format, they probably would hesitate to upgrade Spark 
(They may hit regressions in new Spark version).

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1586,6 +1586,21 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  /**
+   * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places 
should be updated
+   * together.
+   */
+  val STATE_STORE_ROCKSDB_FORMAT_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion")
+      .internal()
+      .doc("Set the RocksDB format version. This will be stored in the 
checkpoint when starting " +
+        "a streaming query. If this configuration is not set, we will use the 
value in the " +

Review comment:
       We would like to write `5` to the checkpoint when the config is not set. 
That's why we need a default value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to