This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b174aaa42e6 [SPARK-44021][SQL] Add spark.sql.files.maxPartitionNum
b174aaa42e6 is described below

commit b174aaa42e6d405a1d157534eec9623436c3232b
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Tue Jun 13 18:13:24 2023 -0700

    [SPARK-44021][SQL] Add spark.sql.files.maxPartitionNum
    
    ### What changes were proposed in this pull request?
    
    This PR add a new SQL config: `spark.sql.files.maxPartitionNum`. User can 
set it to avoid generating too many partitions when reading file-based sources. 
Too many partitions will increase the various overheads of the driver and cause 
Shuffle service OOM.
    
    The following is the GC log of the Shuffle service:
    ```
    2023-06-08T01:41:01.871-0700: 7303.965: [Full GC (Allocation Failure) 
2023-06-08T01:41:01.871-0700: 7303.965: [CMS: 4194304K->4194304K(4194304K), 
7.4010107 secs]2023-06-08T01:41:09.272-0700: 7311.366: [Class Histogram (after 
full gc):
     num     #instances         #bytes  class name
    ----------------------------------------------
       1:       7110660     2334927400  [C
       2:      19465810      467514416  [I
       3:       6754570      270182800  
org.apache.spark.network.protocol.ChunkFetchRequest
       4:       6661155      266446200  
org.sparkproject.io.netty.channel.DefaultChannelPromise
       5:       6639056      265562240  
org.apache.spark.network.buffer.FileSegmentManagedBuffer
       6:       6639055      265562200  
org.apache.spark.network.protocol.RequestTraceInfo
       7:       6663764      213240448  
org.sparkproject.io.netty.util.Recycler$DefaultHandle
       8:       6659382      213100224  
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$WriteTask
       9:       6659218      213094976  
org.apache.spark.network.server.ChunkFetchRequestHandler$$Lambda$156/886274988
      10:       6640444      212494208  java.io.File
    ...
    ```
    
    ### Why are the changes needed?
    
    1. The PR aims to selectively rescale large RDDs only, while keeping the 
existing behavior in the same way in small RDDs. So directly increasing 
`spark.sql.files.maxPartitionBytes` is not acceptable:
       1. There are multiple data sources in one SQL, setting 
`spark.sql.files.maxPartitionBytes` will affect all data sources.
       2. We don't know how much `spark.sql.files.maxPartitionBytes` should be 
set to, sometimes it may be very large(More than 20GiB).
    
    2. To make it do not generate too many partitions if it is very large 
partitioned and bucketed table as it is not always use bucket scan since 
[SPARK-32859](https://issues.apache.org/jira/browse/SPARK-32859).
    
       Before SPARK-32859 | After SPARK-32859
       --- | ---
       <img width="400" 
src="https://github.com/apache/spark/assets/5399861/5e14932b-aa3d-4b14-b80c-e3ff348958c4";>
 | <img width="400" 
src="https://github.com/apache/spark/assets/5399861/170311a0-c086-408a-9d95-17031e21e16a";>
    
    3. Avoid generating too many partitions if these are lots of small files.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Unless the user sets `spark.sql.files.maxPartitionNum`.
    
    ### How was this patch tested?
    
    Unit test and manual testing:
    
    Before this PR | After this PR and `set 
spark.sql.files.maxPartitionNum=20000`
    -- | --
    <img width="400" 
src="https://github.com/apache/spark/assets/5399861/ffda1850-cd4a-4970-a4e5-e1e43177135a";>
 | <img width="330" 
src="https://github.com/apache/spark/assets/5399861/1df7cac7-fe82-4af3-b3ec-91aa23c79a8b";>
    
    Closes #41545 from wangyum/SPARK-44021.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 docs/sql-performance-tuning.md                     | 11 ++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 12 +++++++++
 .../sql/execution/datasources/FilePartition.scala  | 31 +++++++++++++++++++---
 .../datasources/FileSourceStrategySuite.scala      | 23 ++++++++++++++++
 4 files changed, 73 insertions(+), 4 deletions(-)

diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md
index 36b02d122bd..1467409bb50 100644
--- a/docs/sql-performance-tuning.md
+++ b/docs/sql-performance-tuning.md
@@ -95,6 +95,17 @@ that these options will be deprecated in future release as 
more optimizations ar
     </td>
     <td>3.1.0</td>
   </tr>
+  <tr>
+    <td><code>spark.sql.files.maxPartitionNum</code></td>
+    <td>None</td>
+    <td>
+      The suggested (not guaranteed) maximum number of split file partitions. 
If it is set,
+      Spark will rescale each partition to make the number of partitions is 
close to this
+      value if the initial number of partitions exceeds this value. This 
configuration is
+      effective only when using file-based sources such as Parquet, JSON and 
ORC.
+    </td>
+    <td>3.5.0</td>
+  </tr>
   <tr>
     <td><code>spark.sql.broadcastTimeout</code></td>
     <td>300</td>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 47b8474953b..2fb3f17b725 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1749,6 +1749,16 @@ object SQLConf {
     .checkValue(v => v > 0, "The min partition number must be a positive 
integer.")
     .createOptional
 
+  val FILES_MAX_PARTITION_NUM = buildConf("spark.sql.files.maxPartitionNum")
+    .doc("The suggested (not guaranteed) maximum number of split file 
partitions. If it is set, " +
+      "Spark will rescale each partition to make the number of partitions is 
close to this " +
+      "value if the initial number of partitions exceeds this value. This 
configuration is " +
+      "effective only when using file-based sources such as Parquet, JSON and 
ORC.")
+    .version("3.5.0")
+    .intConf
+    .checkValue(v => v > 0, "The maximum number of partitions must be a 
positive integer.")
+    .createOptional
+
   val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles")
     .doc("Whether to ignore corrupt files. If true, the Spark jobs will 
continue to run when " +
       "encountering corrupted files and the contents that have been read will 
still be returned. " +
@@ -4509,6 +4519,8 @@ class SQLConf extends Serializable with Logging {
 
   def filesMinPartitionNum: Option[Int] = getConf(FILES_MIN_PARTITION_NUM)
 
+  def filesMaxPartitionNum: Option[Int] = getConf(FILES_MAX_PARTITION_NUM)
+
   def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
 
   def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
index a4d16a0fd2b..8169652c6f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
@@ -18,11 +18,13 @@ package org.apache.spark.sql.execution.datasources
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.math.BigDecimal.RoundingMode
 
 import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A collection of file blocks that should be read as a single task
@@ -50,10 +52,10 @@ case class FilePartition(index: Int, files: 
Array[PartitionedFile])
 
 object FilePartition extends Logging {
 
-  def getFilePartitions(
-      sparkSession: SparkSession,
+  private def getFilePartitions(
       partitionedFiles: Seq[PartitionedFile],
-      maxSplitBytes: Long): Seq[FilePartition] = {
+      maxSplitBytes: Long,
+      openCostInBytes: Long): Seq[FilePartition] = {
     val partitions = new ArrayBuffer[FilePartition]
     val currentFiles = new ArrayBuffer[PartitionedFile]
     var currentSize = 0L
@@ -69,7 +71,6 @@ object FilePartition extends Logging {
       currentSize = 0
     }
 
-    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
     // Assign files to partitions using "Next Fit Decreasing"
     partitionedFiles.foreach { file =>
       if (currentSize + file.length > maxSplitBytes) {
@@ -83,6 +84,28 @@ object FilePartition extends Logging {
     partitions.toSeq
   }
 
+  def getFilePartitions(
+      sparkSession: SparkSession,
+      partitionedFiles: Seq[PartitionedFile],
+      maxSplitBytes: Long): Seq[FilePartition] = {
+    val openCostBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+    val maxPartitionNum = sparkSession.sessionState.conf.filesMaxPartitionNum
+    val partitions = getFilePartitions(partitionedFiles, maxSplitBytes, 
openCostBytes)
+    if (maxPartitionNum.exists(partitions.size > _)) {
+      val totalSizeInBytes =
+        partitionedFiles.map(_.length + 
openCostBytes).map(BigDecimal(_)).sum[BigDecimal]
+      val desiredSplitBytes =
+        (totalSizeInBytes / BigDecimal(maxPartitionNum.get)).setScale(0, 
RoundingMode.UP).longValue
+      val desiredPartitions = getFilePartitions(partitionedFiles, 
desiredSplitBytes, openCostBytes)
+      logWarning(s"The number of partitions is ${partitions.size}, which 
exceeds the maximum " +
+        s"number configured: $maxPartitionNum. Spark rescales it to 
${desiredPartitions.size} " +
+        s"by ignoring the configuration of 
${SQLConf.FILES_MAX_PARTITION_BYTES.key}.")
+      desiredPartitions
+    } else {
+      partitions
+    }
+  }
+
   def maxSplitBytes(
       sparkSession: SparkSession,
       selectedPartitions: Seq[PartitionDirectory]): Long = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 26655c2d95a..91182f6473d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -602,6 +602,29 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSparkSession {
     checkDataFilters(Set.empty)
   }
 
+  test(s"SPARK-44021: Test ${SQLConf.FILES_MAX_PARTITION_NUM.key} works as 
expected") {
+    val files =
+      Range(0, 300000).map(p => PartitionedFile(InternalRow.empty, sp(s"$p"), 
0, 50000000))
+    val maxPartitionBytes = conf.filesMaxPartitionBytes
+    val defaultPartitions = FilePartition.getFilePartitions(spark, files, 
maxPartitionBytes)
+    assert(defaultPartitions.size === 150000)
+
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_NUM.key -> "20000") {
+      val partitions = FilePartition.getFilePartitions(spark, files, 
maxPartitionBytes)
+      assert(partitions.size === 20000)
+    }
+
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_NUM.key -> "50000") {
+      val partitions = FilePartition.getFilePartitions(spark, files, 
maxPartitionBytes)
+      assert(partitions.size === 50000)
+    }
+
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_NUM.key -> "200000") {
+      val partitions = FilePartition.getFilePartitions(spark, files, 
maxPartitionBytes)
+      assert(partitions.size === defaultPartitions.size)
+    }
+  }
+
   // Helpers for checking the arguments passed to the FileFormat.
 
   protected val checkPartitionSchema =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to