HeartSaVioR commented on a change in pull request #33749: URL: https://github.com/apache/spark/pull/33749#discussion_r690031859
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ########## @@ -1586,6 +1586,21 @@ object SQLConf { .stringConf .createWithDefault("lz4") + /** + * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places should be updated + * together. + */ + val STATE_STORE_ROCKSDB_FORMAT_VERSION = + buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion") + .internal() + .doc("Set the RocksDB format version. This will be stored in the checkpoint when starting " + Review comment: Could we please describe the case when end users want to set the config instead of default one? Otherwise old few people can understand how it works and why this configuration exists. ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ########## @@ -1586,6 +1586,21 @@ object SQLConf { .stringConf .createWithDefault("lz4") + /** + * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places should be updated + * together. + */ + val STATE_STORE_ROCKSDB_FORMAT_VERSION = + buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion") + .internal() + .doc("Set the RocksDB format version. This will be stored in the checkpoint when starting " + + "a streaming query. If this configuration is not set, we will use the value in the " + + "checkpoint when restarting a streaming query.") + .version("3.2.0") + .intConf + .checkValue(_ >= 0, "Must not be negative") + .createWithDefault(5) Review comment: May worth having a single line comment that 5 is the latest table format version for RocksDB 6.20.3. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala ########## @@ -497,23 +497,38 @@ case class RocksDBConf( blockSizeKB: Long, blockCacheSizeMB: Long, lockAcquireTimeoutMs: Long, - resetStatsOnLoad : Boolean) + resetStatsOnLoad : Boolean, + formatVersion: Int) object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" - private case class ConfEntry(name: String, default: String) { - def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) + case class ConfEntry(name: String, default: String) { + def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}" } // Configuration that specifies whether to compact the RocksDB data every time data is committed - private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false") + val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false") private val PAUSE_BG_WORK_FOR_COMMIT_CONF = ConfEntry("pauseBackgroundWorkForCommit", "true") private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4") private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8") - private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000") + val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000") private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true") + // Configuration to set the RocksDB format version. When upgrading the RocksDB version in Spark, Review comment: Nice explanation! It would be nice if we can refer this from config in SQLConf which is closer to user facing - despite it's marked as internal, they find the config in SQLConf first instead of this. ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala ########## @@ -62,8 +62,9 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val testConfs = Seq( ("spark.sql.streaming.stateStore.providerClass", classOf[RocksDBStateStoreProvider].getName), - (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".compactOnCommit", "true"), - (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10") + (RocksDBConf.COMPACT_ON_COMMIT_CONF.fullName, "true"), Review comment: Should we remove this as well in RocksDBConf if we want to have consistent behavior, "case sensitive"? `val confs = CaseInsensitiveMap[String](storeConf.confs)` ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ########## @@ -1586,6 +1586,21 @@ object SQLConf { .stringConf .createWithDefault("lz4") + /** + * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places should be updated + * together. + */ + val STATE_STORE_ROCKSDB_FORMAT_VERSION = + buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion") + .internal() + .doc("Set the RocksDB format version. This will be stored in the checkpoint when starting " + Review comment: And let's emphasize that this config is to enable rollback to older Spark version, and overriding the value may break the ability to rollback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org