xuanyuanking commented on a change in pull request #33749:
URL: https://github.com/apache/spark/pull/33749#discussion_r690622034



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1586,6 +1586,21 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  /**
+   * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places 
should be updated
+   * together.
+   */
+  val STATE_STORE_ROCKSDB_FORMAT_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion")
+      .internal()
+      .doc("Set the RocksDB format version. This will be stored in the 
checkpoint when starting " +
+        "a streaming query. If this configuration is not set, we will use the 
value in the " +
+        "checkpoint when restarting a streaming query.")
+      .version("3.2.0")
+      .intConf
+      .checkValue(_ >= 0, "Must not be negative")
+      .createWithDefault(5)

Review comment:
       +1, it's a little strange that the version starts from 5.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -497,23 +497,38 @@ case class RocksDBConf(
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
-    resetStatsOnLoad : Boolean)
+    resetStatsOnLoad : Boolean,
+    formatVersion: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
   val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
-  private case class ConfEntry(name: String, default: String) {
-    def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+  case class ConfEntry(name: String, default: String) {
+    def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}"
   }
 
   // Configuration that specifies whether to compact the RocksDB data every 
time data is committed
-  private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
+  val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
   private val PAUSE_BG_WORK_FOR_COMMIT_CONF = 
ConfEntry("pauseBackgroundWorkForCommit", "true")
   private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
   private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
-  private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", 
"60000")
+  val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
   private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+  // Configuration to set the RocksDB format version. When upgrading the 
RocksDB version in Spark,
+  // it may introduce a new table format version that can not be supported by 
an old RocksDB version
+  // used by an old Spark version. Hence, we store the table format version in 
the checkpoint when
+  // a query starts, and when restarting a query from a checkpoint, we will 
use the format version
+  // in the checkpoint. This will ensure the user can still rollback their 
Spark version for an
+  // existing query when RocksDB changes its default table format in a new 
version. The user can
+  // still use this config to ignore the table format version in the 
checkpoint if they don't need
+  // to rollback the Spark version. See
+  // 
https://github.com/facebook/rocksdb/wiki/RocksDB-Compatibility-Between-Different-Releases
+  // for the RocksDB compatibility guarantee.
+  //
+  // Note: this is also defined in 
`SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION`. These two

Review comment:
       Seems a typo? I think should be `SQLConf. 
STATE_STORE_ROCKSDB_FORMAT_VERSION`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -531,14 +546,22 @@ object RocksDBConf {
       }
     }
 
+    def getPositiveIntConf(conf: ConfEntry): Int = {
+      Try { confs.getOrElse(conf.fullName, conf.default).toInt } filter { _ >= 
0 } getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value for '${conf.fullName}', must be a positive integer")
+      }
+    }
+
     RocksDBConf(
       storeConf.minVersionsToRetain,
       getBooleanConf(COMPACT_ON_COMMIT_CONF),
       getBooleanConf(PAUSE_BG_WORK_FOR_COMMIT_CONF),

Review comment:
       Not related to this PR, PAUSE_BG_WORK_FOR_COMMIT_CONF never get used. 
I'll delete it in a follow-up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to