This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new deaa2f7200b [SPARK-44482][CONNECT] Connect server should can specify the bind address deaa2f7200b is described below commit deaa2f7200bb79b5340c722bc8707dd45d50a1c2 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Thu Jul 27 16:43:01 2023 +0900 [SPARK-44482][CONNECT] Connect server should can specify the bind address ### What changes were proposed in this pull request? When a machine has multiple network cards, we may only want users of a certain network segment to be able to connect to the connect server. I propose that in addition to specifying port, the connect server can also specify address. https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala#L26-L30 ### Why are the changes needed? 1.Connect server should can specify the bind address, improve flexibility. 2.As other Spark components, a bind address can also be specified, such as: https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java#L147-L149 https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/core/src/main/scala/org/apache/spark/SparkEnv.scala#L177-L196 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. Closes #42073 from panbingkun/SPARK-44482. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/sql/connect/config/Connect.scala | 6 ++++++ .../spark/sql/connect/service/SparkConnectService.scala | 16 +++++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 13e4b9f5364..31f119047e4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -23,6 +23,12 @@ import org.apache.spark.sql.connect.common.config.ConnectCommon object Connect { import org.apache.spark.sql.internal.SQLConf.buildStaticConf + val CONNECT_GRPC_BINDING_ADDRESS = + ConfigBuilder("spark.connect.grpc.binding.address") + .version("4.0.0") + .stringConf + .createOptional + val CONNECT_GRPC_BINDING_PORT = ConfigBuilder("spark.connect.grpc.binding.port") .version("3.4.0") diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index ad40c94d549..6b7007130be 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connect.service +import java.net.InetSocketAddress import java.util.concurrent.TimeUnit import com.google.common.base.Ticker @@ -32,7 +33,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} +import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} import org.apache.spark.sql.connect.utils.ErrorUtils /** @@ -167,7 +168,7 @@ class SparkConnectService(debug: Boolean) * Used to start the overall SparkConnect service and provides global state to manage the * different SparkSession from different users connecting to the cluster. */ -object SparkConnectService { +object SparkConnectService extends Logging { private val CACHE_SIZE = 100 @@ -245,10 +246,15 @@ object SparkConnectService { */ private def startGRPCService(): Unit = { val debugMode = SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true) + val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS) val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) - val sb = NettyServerBuilder - .forPort(port) - .maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) + val sb = bindAddress match { + case Some(hostname) => + logInfo(s"start GRPC service at: $hostname") + NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) + case _ => NettyServerBuilder.forPort(port) + } + sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) .addService(new SparkConnectService(debugMode)) // Add all registered interceptors to the server builder. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org