This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a5f019554991 [SPARK-45415] Allow selective disabling of "fallocate" in RocksDB statestore a5f019554991 is described below commit a5f01955499141c53c619ddf81d6846a72ad789a Author: Scott Schenkein <scott.schenk...@capitalone.com> AuthorDate: Thu Oct 12 08:44:13 2023 +0900 [SPARK-45415] Allow selective disabling of "fallocate" in RocksDB statestore ### What changes were proposed in this pull request? Our spark environment features a number of parallel structured streaming jobs, many of which have use state store. Most use state store for dropDuplicates and work with a tiny amount of information, but a few have a substantially large state store requiring use of RocksDB. In such a configuration, spark allocates a minimum of `spark.sql.shuffle.partitions * queryCount` partitions, each of which pre-allocate about 74mb (observed on EMR/Hadoop) disk storage for RocksDB. This allocati [...] This PR provides users with the option to simply disable fallocate so RocksDB uses far less space for the smaller state stores, reducing complexity and disk storage at the expense of performance. ### Why are the changes needed? As previously mentioned, these changes allow a spark context to support many parallel structured streaming jobs when using RocksDB state stores without the need to allocate a glut of excess storage. ### Does this PR introduce _any_ user-facing change? Users disable the fallocate rocksdb performance optimization by configuring `spark.sql.streaming.stateStore.rocksdb.allowFAllocate=false` ### How was this patch tested? 1) A few test cases were added 2) The state store size was validated by running this script with and without fallocate disabled ``` from pyspark.sql.types import StructType, StructField, StringType, TimestampType import datetime if disable_fallocate: spark.conf.set("spark.sql.streaming.stateStore.rocksdb.allowFAllocate", "false") spark.conf.set( "spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider", ) schema = StructType( [ StructField("one", TimestampType(), False), StructField("two", StringType(), True), ] ) now = datetime.datetime.now() data = [(now, y) for y in range(300)] init_df = spark.createDataFrame(data, schema) path = "/tmp/stream_try/test" init_df.write.format("parquet").mode("append").save(path) stream_df = spark.readStream.schema(schema).format("parquet").load(path) stream_df = stream_df.dropDuplicates(["one"]) def foreach_batch_function(batch_df, epoch_id): batch_df.write.format("parquet").mode("append").option("path", path + "_out").save() stream_df.writeStream.foreachBatch(foreach_batch_function).option( "checkpointLocation", path + "_checkpoint" ).start() ``` With these results (local run, docker container with small FS) ``` allowFAllocate=True (current default) --------------------- root0ef384f699e0:/tmp# du -sh spark-d43a2964-c92a-4d94-9fdd-f3557a651fd9 808M spark-d43a2964-c92a-4d94-9fdd-f3557a651fd9 | |-->4.1M StateStoreId(opId=0,partId=0,name=default)-d59b907c-8004-47f9-a8a1-dec131f73505 |--> <snip> |-->4.1M StateStoreId(opId=0,partId=199,name=default)-b49a93fe-1007-4e92-8f8f-5767aef41e5c allowFAllocate=False (new feature) ---------------------- root0ef384f699e0:/tmp# du -sh spark-00cb768d-2659-453c-8670-4aaf70148041 7.9M spark-00cb768d-2659-453c-8670-4aaf70148041 | |-->40K StateStoreId(opId=0,partId=0,name=default)-45b38d9c-737b-49b1-bb82- |--> <snip> |-->40K StateStoreId(opId=0,partId=199,name=default)-28a6cc02-2693-4360-b47a-1f1ab0d54a61 ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #43202 from schenksj/feature/rocksdb_allow_fallocate. Authored-by: Scott Schenkein <scott.schenk...@capitalone.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- docs/structured-streaming-programming-guide.md | 5 +++++ .../spark/sql/execution/streaming/state/RocksDB.scala | 15 +++++++++++++-- .../streaming/state/RocksDBStateStoreSuite.scala | 2 ++ .../sql/execution/streaming/state/RocksDBSuite.scala | 6 ++++++ 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 774422a9cd9d..9fb823abaa3a 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2385,6 +2385,11 @@ Here are the configs regarding to RocksDB instance of the state store provider: <td>Total memory to be occupied by blocks in high priority pool as a fraction of memory allocated across all RocksDB instances on a single node using maxMemoryUsageMB.</td> <td>0.1</td> </tr> + <tr> + <td>spark.sql.streaming.stateStore.rocksdb.allowFAllocate</td> + <td>Allow the rocksdb runtime to use fallocate to pre-allocate disk space for logs, etc... Disable for apps that have many smaller state stores to trade off disk space for write performance.</td> + <td>true</td> + </tr> </table> ##### RocksDB State Store Memory Management diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index a2868df94117..60249550c4ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -109,6 +109,7 @@ class RocksDB( dbOptions.setCreateIfMissing(true) dbOptions.setTableFormatConfig(tableFormatConfig) dbOptions.setMaxOpenFiles(conf.maxOpenFiles) + dbOptions.setAllowFAllocate(conf.allowFAllocate) if (conf.boundedMemoryUsage) { dbOptions.setWriteBufferManager(writeBufferManager) @@ -674,7 +675,8 @@ case class RocksDBConf( totalMemoryUsageMB: Long, writeBufferCacheRatio: Double, highPriorityPoolRatio: Double, - compressionCodec: String) + compressionCodec: String, + allowFAllocate: Boolean) object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ @@ -757,6 +759,14 @@ object RocksDBConf { private val HIGH_PRIORITY_POOL_RATIO_CONF = SQLConfEntry(HIGH_PRIORITY_POOL_RATIO_CONF_KEY, "0.1") + // Allow files to be pre-allocated on disk using fallocate + // Disabling may slow writes, but can solve an issue where + // significant quantities of disk are wasted if there are + // many smaller concurrent state-stores running with the + // spark context + val ALLOW_FALLOCATE_CONF_KEY = "allowFAllocate" + private val ALLOW_FALLOCATE_CONF = SQLConfEntry(ALLOW_FALLOCATE_CONF_KEY, "true") + def apply(storeConf: StateStoreConf): RocksDBConf = { val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs) val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions) @@ -834,7 +844,8 @@ object RocksDBConf { getLongConf(MAX_MEMORY_USAGE_MB_CONF), getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF), getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF), - storeConf.compressionCodec) + storeConf.compressionCodec, + getBooleanConf(ALLOW_FALLOCATE_CONF)) } def apply(): RocksDBConf = apply(new StateStoreConf()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index d1cc7e0b3b9c..82f677a98162 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -86,6 +86,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"), (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"), (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", "16"), + (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".allowFAllocate", "false"), (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4") ) testConfs.foreach { case (k, v) => spark.conf.set(k, v) } @@ -115,6 +116,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid assert(rocksDBConfInTask.maxOpenFiles == 1000) assert(rocksDBConfInTask.maxWriteBufferNumber == 3) assert(rocksDBConfInTask.writeBufferSizeMB == 16L) + assert(rocksDBConfInTask.allowFAllocate == false) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 764358dc1f09..b5e1eccba339 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -1040,6 +1040,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("Verify that fallocate is allowed by default") { + val sqlConf = new SQLConf + val dbConf = RocksDBConf(StateStoreConf(sqlConf)) + assert(dbConf.allowFAllocate == true) + } + /** RocksDB memory management tests for bounded memory usage */ test("Memory mgmt - invalid config") { withTempDir { dir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org