This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new deaa2f7200b [SPARK-44482][CONNECT] Connect server should can specify 
the bind address
deaa2f7200b is described below

commit deaa2f7200bb79b5340c722bc8707dd45d50a1c2
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Thu Jul 27 16:43:01 2023 +0900

    [SPARK-44482][CONNECT] Connect server should can specify the bind address
    
    ### What changes were proposed in this pull request?
    When a machine has multiple network cards, we may only want users of a 
certain network segment to be able to connect to the connect server. I propose 
that in addition to specifying port, the connect server can also specify 
address.
    
https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala#L26-L30
    
    ### Why are the changes needed?
    1.Connect server should can specify the bind address, improve flexibility.
    2.As other Spark components, a bind address can also be specified, such as:
    
https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java#L147-L149
    
https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/core/src/main/scala/org/apache/spark/SparkEnv.scala#L177-L196
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Manually test.
    - Pass GA.
    
    Closes #42073 from panbingkun/SPARK-44482.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/sql/connect/config/Connect.scala    |  6 ++++++
 .../spark/sql/connect/service/SparkConnectService.scala  | 16 +++++++++++-----
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 13e4b9f5364..31f119047e4 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -23,6 +23,12 @@ import 
org.apache.spark.sql.connect.common.config.ConnectCommon
 object Connect {
   import org.apache.spark.sql.internal.SQLConf.buildStaticConf
 
+  val CONNECT_GRPC_BINDING_ADDRESS =
+    ConfigBuilder("spark.connect.grpc.binding.address")
+      .version("4.0.0")
+      .stringConf
+      .createOptional
+
   val CONNECT_GRPC_BINDING_PORT =
     ConfigBuilder("spark.connect.grpc.binding.port")
       .version("3.4.0")
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index ad40c94d549..6b7007130be 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.connect.service
 
+import java.net.InetSocketAddress
 import java.util.concurrent.TimeUnit
 
 import com.google.common.base.Ticker
@@ -32,7 +33,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT, 
CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
+import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, 
CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
 import org.apache.spark.sql.connect.utils.ErrorUtils
 
 /**
@@ -167,7 +168,7 @@ class SparkConnectService(debug: Boolean)
  * Used to start the overall SparkConnect service and provides global state to 
manage the
  * different SparkSession from different users connecting to the cluster.
  */
-object SparkConnectService {
+object SparkConnectService extends Logging {
 
   private val CACHE_SIZE = 100
 
@@ -245,10 +246,15 @@ object SparkConnectService {
    */
   private def startGRPCService(): Unit = {
     val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
+    val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
     val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
-    val sb = NettyServerBuilder
-      .forPort(port)
-      
.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+    val sb = bindAddress match {
+      case Some(hostname) =>
+        logInfo(s"start GRPC service at: $hostname")
+        NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
+      case _ => NettyServerBuilder.forPort(port)
+    }
+    
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
       .addService(new SparkConnectService(debugMode))
 
     // Add all registered interceptors to the server builder.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to