This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f019554991 [SPARK-45415] Allow selective disabling of "fallocate" in 
RocksDB statestore
a5f019554991 is described below

commit a5f01955499141c53c619ddf81d6846a72ad789a
Author: Scott Schenkein <scott.schenk...@capitalone.com>
AuthorDate: Thu Oct 12 08:44:13 2023 +0900

    [SPARK-45415] Allow selective disabling of "fallocate" in RocksDB statestore
    
    ### What changes were proposed in this pull request?
    Our spark environment features a number of parallel structured streaming 
jobs, many of which have use state store.  Most use state store for 
dropDuplicates and work with a tiny amount of information, but a few have a 
substantially large state store requiring use of RocksDB.  In such a 
configuration, spark allocates a minimum of `spark.sql.shuffle.partitions * 
queryCount` partitions, each of which pre-allocate about 74mb (observed on 
EMR/Hadoop) disk storage for RocksDB.  This allocati [...]
    
    This PR provides users with the option to simply disable fallocate so 
RocksDB uses far less space for the smaller state stores, reducing complexity 
and disk storage at the expense of performance.
    
    ### Why are the changes needed?
    
    As previously mentioned, these changes allow a spark context to support 
many parallel structured streaming jobs when using RocksDB state stores without 
the need to allocate a glut of excess storage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Users disable the fallocate rocksdb performance optimization by configuring 
`spark.sql.streaming.stateStore.rocksdb.allowFAllocate=false`
    
    ### How was this patch tested?
    
    1) A few test cases were added
    2) The state store size was validated by running this script with and 
without fallocate disabled
    
    ```
    from pyspark.sql.types import StructType, StructField, StringType, 
TimestampType
    import datetime
    
    if disable_fallocate:
       spark.conf.set("spark.sql.streaming.stateStore.rocksdb.allowFAllocate", 
"false")
    
    spark.conf.set(
        "spark.sql.streaming.stateStore.providerClass",
        
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
    )
    
    schema = StructType(
        [
            StructField("one", TimestampType(), False),
            StructField("two", StringType(), True),
        ]
    )
    
    now = datetime.datetime.now()
    data = [(now, y) for y in range(300)]
    init_df = spark.createDataFrame(data, schema)
    
    path = "/tmp/stream_try/test"
    init_df.write.format("parquet").mode("append").save(path)
    
    stream_df = spark.readStream.schema(schema).format("parquet").load(path)
    
    stream_df = stream_df.dropDuplicates(["one"])
    
    def foreach_batch_function(batch_df, epoch_id):
        batch_df.write.format("parquet").mode("append").option("path", path + 
"_out").save()
    
    stream_df.writeStream.foreachBatch(foreach_batch_function).option(
        "checkpointLocation", path + "_checkpoint"
    ).start()
    ```
    
    With these results (local run, docker container with small FS)
    ```
    allowFAllocate=True (current default)
    ---------------------
    root0ef384f699e0:/tmp# du -sh spark-d43a2964-c92a-4d94-9fdd-f3557a651fd9
    808M    spark-d43a2964-c92a-4d94-9fdd-f3557a651fd9
    |
    |-->4.1M        
StateStoreId(opId=0,partId=0,name=default)-d59b907c-8004-47f9-a8a1-dec131f73505
    |--> <snip>
    |-->4.1M        
StateStoreId(opId=0,partId=199,name=default)-b49a93fe-1007-4e92-8f8f-5767aef41e5c
    
    allowFAllocate=False (new feature)
    ----------------------
    root0ef384f699e0:/tmp# du -sh spark-00cb768d-2659-453c-8670-4aaf70148041
    
    7.9M    spark-00cb768d-2659-453c-8670-4aaf70148041
    |
    |-->40K StateStoreId(opId=0,partId=0,name=default)-45b38d9c-737b-49b1-bb82-
    |--> <snip>
    |-->40K 
StateStoreId(opId=0,partId=199,name=default)-28a6cc02-2693-4360-b47a-1f1ab0d54a61
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43202 from schenksj/feature/rocksdb_allow_fallocate.
    
    Authored-by: Scott Schenkein <scott.schenk...@capitalone.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/structured-streaming-programming-guide.md            |  5 +++++
 .../spark/sql/execution/streaming/state/RocksDB.scala     | 15 +++++++++++++--
 .../streaming/state/RocksDBStateStoreSuite.scala          |  2 ++
 .../sql/execution/streaming/state/RocksDBSuite.scala      |  6 ++++++
 4 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 774422a9cd9d..9fb823abaa3a 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2385,6 +2385,11 @@ Here are the configs regarding to RocksDB instance of 
the state store provider:
     <td>Total memory to be occupied by blocks in high priority pool as a 
fraction of memory allocated across all RocksDB instances on a single node 
using maxMemoryUsageMB.</td>
     <td>0.1</td>
   </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.allowFAllocate</td>
+    <td>Allow the rocksdb runtime to use fallocate to pre-allocate disk space 
for logs, etc...  Disable for apps that have many smaller state stores to trade 
off disk space for write performance.</td>
+    <td>true</td>
+  </tr>
 </table>
 
 ##### RocksDB State Store Memory Management
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index a2868df94117..60249550c4ed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -109,6 +109,7 @@ class RocksDB(
   dbOptions.setCreateIfMissing(true)
   dbOptions.setTableFormatConfig(tableFormatConfig)
   dbOptions.setMaxOpenFiles(conf.maxOpenFiles)
+  dbOptions.setAllowFAllocate(conf.allowFAllocate)
 
   if (conf.boundedMemoryUsage) {
     dbOptions.setWriteBufferManager(writeBufferManager)
@@ -674,7 +675,8 @@ case class RocksDBConf(
     totalMemoryUsageMB: Long,
     writeBufferCacheRatio: Double,
     highPriorityPoolRatio: Double,
-    compressionCodec: String)
+    compressionCodec: String,
+    allowFAllocate: Boolean)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -757,6 +759,14 @@ object RocksDBConf {
   private val HIGH_PRIORITY_POOL_RATIO_CONF = 
SQLConfEntry(HIGH_PRIORITY_POOL_RATIO_CONF_KEY,
     "0.1")
 
+  // Allow files to be pre-allocated on disk using fallocate
+  // Disabling may slow writes, but can solve an issue where
+  // significant quantities of disk are wasted if there are
+  // many smaller concurrent state-stores running with the
+  // spark context
+  val ALLOW_FALLOCATE_CONF_KEY = "allowFAllocate"
+  private val ALLOW_FALLOCATE_CONF = SQLConfEntry(ALLOW_FALLOCATE_CONF_KEY, 
"true")
+
   def apply(storeConf: StateStoreConf): RocksDBConf = {
     val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
     val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -834,7 +844,8 @@ object RocksDBConf {
       getLongConf(MAX_MEMORY_USAGE_MB_CONF),
       getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
       getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
-      storeConf.compressionCodec)
+      storeConf.compressionCodec,
+      getBooleanConf(ALLOW_FALLOCATE_CONF))
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index d1cc7e0b3b9c..82f677a98162 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -86,6 +86,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", 
"3"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", 
"16"),
+        (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".allowFAllocate", 
"false"),
         (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
       )
       testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -115,6 +116,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       assert(rocksDBConfInTask.maxOpenFiles == 1000)
       assert(rocksDBConfInTask.maxWriteBufferNumber == 3)
       assert(rocksDBConfInTask.writeBufferSizeMB == 16L)
+      assert(rocksDBConfInTask.allowFAllocate == false)
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 764358dc1f09..b5e1eccba339 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -1040,6 +1040,12 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  test("Verify that fallocate is allowed by default") {
+     val sqlConf = new SQLConf
+     val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+     assert(dbConf.allowFAllocate == true)
+  }
+
  /** RocksDB memory management tests for bounded memory usage */
   test("Memory mgmt - invalid config") {
     withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to