HeartSaVioR commented on a change in pull request #33749:
URL: https://github.com/apache/spark/pull/33749#discussion_r690031859



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1586,6 +1586,21 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  /**
+   * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places 
should be updated
+   * together.
+   */
+  val STATE_STORE_ROCKSDB_FORMAT_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion")
+      .internal()
+      .doc("Set the RocksDB format version. This will be stored in the 
checkpoint when starting " +

Review comment:
       Could we please describe the case when end users want to set the config 
instead of default one? Otherwise old few people can understand how it works 
and why this configuration exists.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1586,6 +1586,21 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  /**
+   * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places 
should be updated
+   * together.
+   */
+  val STATE_STORE_ROCKSDB_FORMAT_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion")
+      .internal()
+      .doc("Set the RocksDB format version. This will be stored in the 
checkpoint when starting " +
+        "a streaming query. If this configuration is not set, we will use the 
value in the " +
+        "checkpoint when restarting a streaming query.")
+      .version("3.2.0")
+      .intConf
+      .checkValue(_ >= 0, "Must not be negative")
+      .createWithDefault(5)

Review comment:
       May worth having a single line comment that 5 is the latest table format 
version for RocksDB 6.20.3.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -497,23 +497,38 @@ case class RocksDBConf(
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
-    resetStatsOnLoad : Boolean)
+    resetStatsOnLoad : Boolean,
+    formatVersion: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
   val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
-  private case class ConfEntry(name: String, default: String) {
-    def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+  case class ConfEntry(name: String, default: String) {
+    def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}"
   }
 
   // Configuration that specifies whether to compact the RocksDB data every 
time data is committed
-  private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
+  val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
   private val PAUSE_BG_WORK_FOR_COMMIT_CONF = 
ConfEntry("pauseBackgroundWorkForCommit", "true")
   private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
   private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
-  private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", 
"60000")
+  val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
   private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+  // Configuration to set the RocksDB format version. When upgrading the 
RocksDB version in Spark,

Review comment:
       Nice explanation! It would be nice if we can refer this from config in 
SQLConf which is closer to user facing - despite it's marked as internal, they 
find the config in SQLConf first instead of this.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
##########
@@ -62,8 +62,9 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       val testConfs = Seq(
         ("spark.sql.streaming.stateStore.providerClass",
           classOf[RocksDBStateStoreProvider].getName),
-        (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".compactOnCommit", "true"),
-        (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10")
+        (RocksDBConf.COMPACT_ON_COMMIT_CONF.fullName, "true"),

Review comment:
       Should we remove this as well in RocksDBConf if we want to have 
consistent behavior, "case sensitive"?
   `val confs = CaseInsensitiveMap[String](storeConf.confs)`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1586,6 +1586,21 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  /**
+   * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places 
should be updated
+   * together.
+   */
+  val STATE_STORE_ROCKSDB_FORMAT_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion")
+      .internal()
+      .doc("Set the RocksDB format version. This will be stored in the 
checkpoint when starting " +

Review comment:
       And let's emphasize that this config is to enable rollback to older 
Spark version, and overriding the value may break the ability to rollback.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to