This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new da19e0de05b [SPARK-42816][CONNECT] Support Max Message size up to 128MB
da19e0de05b is described below

commit da19e0de05b4fbccae2c21385e67256ff31b1f1a
Author: Martin Grund <martin.gr...@databricks.com>
AuthorDate: Tue Mar 21 18:12:57 2023 -0700

    [SPARK-42816][CONNECT] Support Max Message size up to 128MB
    
    ### What changes were proposed in this pull request?
    
    This change lifts the default message size of 4MB to 128MB and makes it 
configurable. While 128MB is a "random number" it supports creating DataFrames 
from reasonably sized local data without failing.
    
    ### Why are the changes needed?
    Usability
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual
    
    Closes #40447 from grundprinzip/SPARK-42816.
    
    Lead-authored-by: Martin Grund <martin.gr...@databricks.com>
    Co-authored-by: Martin Grund <grundprin...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/sql/connect/client/SparkConnectClient.scala |  1 +
 .../spark/sql/connect/common/config/ConnectCommon.scala      |  1 +
 .../scala/org/apache/spark/sql/connect/config/Connect.scala  |  8 ++++++++
 .../spark/sql/connect/service/SparkConnectService.scala      |  3 ++-
 docs/configuration.md                                        |  8 ++++++++
 python/pyspark/sql/connect/client.py                         | 12 +++++++++++-
 python/pyspark/sql/tests/connect/test_connect_basic.py       |  9 +++++++++
 7 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 60d6d202ff5..a298c526883 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -406,6 +406,7 @@ private[sql] object SparkConnectClient {
       if (metadata.nonEmpty) {
         channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
       }
+      
channelBuilder.maxInboundMessageSize(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
       new SparkConnectClient(
         userContextBuilder.build(),
         channelBuilder,
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
index 48ae4d2d77f..3f594d79b62 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala
@@ -18,4 +18,5 @@ package org.apache.spark.sql.connect.common.config
 
 private[connect] object ConnectCommon {
   val CONNECT_GRPC_BINDING_PORT: Int = 15002
+  val CONNECT_GRPC_MAX_MESSAGE_SIZE: Int = 128 * 1024 * 1024;
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 64b5bd5d813..19fdad97b5f 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -47,6 +47,14 @@ object Connect {
       .bytesConf(ByteUnit.MiB)
       .createWithDefaultString("4m")
 
+  val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
+    ConfigBuilder("spark.connect.grpc.maxInboundMessageSize")
+      .doc("Sets the maximum inbound message in bytes size for the gRPC 
requests." +
+        "Requests with a larger payload will fail.")
+      .version("3.4.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
+
   val CONNECT_EXTENSIONS_RELATION_CLASSES =
     ConfigBuilder("spark.connect.extensions.relation.classes")
       .doc("""
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index cd353b6ff60..a9442a8c92c 100755
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -42,7 +42,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_BINDING_PORT
+import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT, 
CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
 
 /**
  * The SparkConnectService implementation.
@@ -276,6 +276,7 @@ object SparkConnectService {
     val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
     val sb = NettyServerBuilder
       .forPort(port)
+      
.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
       .addService(new SparkConnectService(debugMode))
 
     // Add all registered interceptors to the server builder.
diff --git a/docs/configuration.md b/docs/configuration.md
index 5b29261c58a..e136fef9586 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3171,6 +3171,14 @@ They are typically set via the config file and 
command-lineoptions with `--conf/
   <td>When using Apache Arrow, limit the maximum size of one arrow batch that 
can be sent from server side to client side. Currently, we conservatively use 
70% of it because the size is not accurate but estimated.</td>
   <td>3.4.0</td>
 </tr>
+<tr>
+  <td><code>spark.connect.grpc.maxInboundMessageSize</code></td>
+  <td>
+    134217728
+  </td>
+  <td>Sets the maximum inbound message size for the gRPC requests. Requests 
with a larger payload will fail.</td>
+  <td>3.4.0</td>
+</tr>
 <tr>
   <td><code>spark.connect.extensions.relation.classes</code></td>
   <td>
diff --git a/python/pyspark/sql/connect/client.py 
b/python/pyspark/sql/connect/client.py
index 53fa97372a7..4db852c951e 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -123,6 +123,7 @@ class ChannelBuilder:
     PARAM_TOKEN = "token"
     PARAM_USER_ID = "user_id"
     PARAM_USER_AGENT = "user_agent"
+    MAX_MESSAGE_LENGTH = 128 * 1024 * 1024
 
     @staticmethod
     def default_port() -> int:
@@ -177,7 +178,16 @@ class ChannelBuilder:
                 f"Path component for connection URI must be empty: 
{self.url.path}"
             )
         self._extract_attributes()
-        self._channel_options = channelOptions
+
+        GRPC_DEFAULT_OPTIONS = [
+            ("grpc.max_send_message_length", 
ChannelBuilder.MAX_MESSAGE_LENGTH),
+            ("grpc.max_receive_message_length", 
ChannelBuilder.MAX_MESSAGE_LENGTH),
+        ]
+
+        if channelOptions is None:
+            self._channel_options = GRPC_DEFAULT_OPTIONS
+        else:
+            self._channel_options = GRPC_DEFAULT_OPTIONS + channelOptions
 
     def _extract_attributes(self) -> None:
         if len(self.url.params) > 0:
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 491865ad9c9..0a47dc193c9 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2934,6 +2934,15 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
         self.assertEqual(cdf2.schema, sdf2.schema)
         self.assertEqual(cdf2.collect(), sdf2.collect())
 
+    def test_large_client_data(self):
+        # SPARK-42816 support more than 4MB message size.
+        # ~200bytes
+        cols = ["abcdefghijklmnoprstuvwxyz" for x in range(10)]
+        # 100k rows => 20MB
+        row_count = 100 * 1000
+        rows = [cols] * row_count
+        self.assertEqual(row_count, 
self.connect.createDataFrame(data=rows).count())
+
     def test_unsupported_jvm_attribute(self):
         # Unsupported jvm attributes for Spark session.
         unsupported_attrs = ["_jsc", "_jconf", "_jvm", "_jsparkSession"]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to