This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 42bc297d5b9 [SPARK-41433][CONNECT] Make Max Arrow BatchSize 
configurable
42bc297d5b9 is described below

commit 42bc297d5b9149730f91af26c857897bf4d6aa91
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Wed Dec 7 16:30:28 2022 +0800

    [SPARK-41433][CONNECT] Make Max Arrow BatchSize configurable
    
    ### What changes were proposed in this pull request?
    Make Max Arrow BatchSize configurable
    
    ### Why are the changes needed?
    make batchsize configurable
    
    ### Does this PR introduce _any_ user-facing change?
    yes, one new configration
    
    ### How was this patch tested?
    existing tests
    
    Closes #38958 from zhengruifeng/connect_config_batchsize.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../main/scala/org/apache/spark/sql/connect/config/Connect.scala | 9 +++++++++
 .../spark/sql/connect/service/SparkConnectStreamHandler.scala    | 9 ++++-----
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 76d159cfd15..a17e9784ec6 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.connect.config
 
 import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
 
 private[spark] object Connect {
 
@@ -34,4 +35,12 @@ private[spark] object Connect {
       .version("3.4.0")
       .stringConf
       .createOptional
+
+  val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE =
+    ConfigBuilder("spark.connect.grpc.arrow.maxBatchSize")
+      .doc("When using Apache Arrow, limit the maximum size of one arrow batch 
that " +
+        "can be sent from server side to client side.")
+      .version("3.4.0")
+      .bytesConf(ByteUnit.MiB)
+      .createWithDefaultString("4m")
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index b5d100e894d..fcae3501cef 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -23,11 +23,13 @@ import scala.util.control.NonFatal
 import com.google.protobuf.ByteString
 import io.grpc.stub.StreamObserver
 
+import org.apache.spark.SparkEnv
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
 import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, QueryStageExec}
@@ -38,9 +40,6 @@ import org.apache.spark.util.ThreadUtils
 class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResponse])
     extends Logging {
 
-  // The maximum batch size in bytes for a single batch of data to be returned 
via proto.
-  private val MAX_BATCH_SIZE: Long = 4 * 1024 * 1024
-
   def handle(v: ExecutePlanRequest): Unit = {
     val session =
       
SparkConnectService.getOrCreateIsolatedSession(v.getUserContext.getUserId).session
@@ -64,12 +63,12 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResp
     val schema = dataframe.schema
     val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
     val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
+    // Conservatively sets it 70% because the size is not accurate but 
estimated.
+    val maxBatchSize = 
(SparkEnv.get.conf.get(CONNECT_GRPC_ARROW_MAX_BATCH_SIZE) * 0.7).toLong
 
     SQLExecution.withNewExecutionId(dataframe.queryExecution, 
Some("collectArrow")) {
       val rows = dataframe.queryExecution.executedPlan.execute()
       val numPartitions = rows.getNumPartitions
-      // Conservatively sets it 70% because the size is not accurate but 
estimated.
-      val maxBatchSize = (MAX_BATCH_SIZE * 0.7).toLong
       var numSent = 0
 
       if (numPartitions > 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to