Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
jshmchenxi commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2194649550 @TakawaAkirayo That's good to know. Thanks for sharing! Side question, how do you manage the termination of Spark Connect servers? Users of JupyterHub usually restart or stop a kernel, would that lead to leaks of the connect server? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
HyukjinKwon closed pull request #46182: [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn URL: https://github.com/apache/spark/pull/46182 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
HyukjinKwon commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2153602043 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2152575211 Thanks for your review! @grundprinzip @HyukjinKwon, after rebase and fix, the failure is resolved now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
HyukjinKwon commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2144252644 Let's fix up the CI failure. Otherwise, looks good to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2116573625 Gently ping @grundprinzip if anything else needs to be provided from my side :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2084451692 > One thing I'm wondering if it might work out of the box is the ability to specify an ephemeral port for the spark connect service and pick this up during startup. > > This might alleviate you from using the startup utils to find an open port and rely on the system to provide you with one. Since you don't care about the actual port, that might be good enough. @grundprinzip I think your idea could definitely work if there are no port limitations. However, one issue we're currently facing internally is that the connection between the client side and the server side is restricted on any port due to internal software firewalls between the Hadoop cluster and provisioned notebooks. The server could pick up a port that not allowed by the firewall. So we have to request a whitelist for a range of ports (One time request for a pool that provisioned on k8s, can not request a big range of ports like 0 ~ 65535). Therefore, using retry with monotone increasing port numbers can help limit the port selection within the expected port range. Not sure if our case is a corner case, please share your thoughts, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1584077843 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala: ## @@ -36,21 +32,21 @@ object SparkConnectServer extends Logging { try { try { SparkConnectService.start(session.sparkContext) -SparkConnectService.server.getListenSockets.asScala.foreach { sa => - val isa = sa.asInstanceOf[InetSocketAddress] - logInfo( -log"Spark Connect server started at: " + - log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") -} +val isa = SparkConnectService.bindingAddress Review Comment: > High level, I think this change is fine, I'm mostly wondering if there is a case where we would have multiple listen sockets, e.g. listening on multiple devices? @grundprinzip Yes, if we want to listen on all of the network interfaces in a machine to accept connections from any network, then we can add multiple addresses to let the Server listen on. For SparkConnectService, the NettyServer is built with single address, so the getListenSockets it should only have one item -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1584064102 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -344,35 +357,79 @@ object SparkConnectService extends Logging { private def startGRPCService(): Unit = { val debugMode = SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true) val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS) -val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) -val sb = bindAddress match { - case Some(hostname) => -logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") -NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) - case _ => NettyServerBuilder.forPort(port) +val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) +val sparkConnectService = new SparkConnectService(debugMode) +val protoReflectionService = if (debugMode) Some(ProtoReflectionService.newInstance()) else None +val configuredInterceptors = SparkConnectInterceptorRegistry.createConfiguredInterceptors() + +val startService = (port: Int) => { + val sb = bindAddress match { +case Some(hostname) => + logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") + NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) +case _ => NettyServerBuilder.forPort(port) + } + sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) +.addService(sparkConnectService) + + // Add all registered interceptors to the server builder. + SparkConnectInterceptorRegistry.chainInterceptors(sb, configuredInterceptors) Review Comment: > Why this change? I'm not sure I understand why the interceptor registry needs to be modified. @grundprinzip Before this change, the SparkConnectInterceptorRegistry.chainInterceptors() would internally invoke createConfiguredInterceptors(). Therefore, if SparkConnectInterceptorRegistry.chainInterceptors() was placed within the code block (startServiceFn), it could be retried multiple times, resulting in the creation of configured interceptors multiple times. After this change, the createConfiguredInterceptors() method will only be invoked once if there are retries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1584047060 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala: ## @@ -36,21 +32,21 @@ object SparkConnectServer extends Logging { try { try { SparkConnectService.start(session.sparkContext) -SparkConnectService.server.getListenSockets.asScala.foreach { sa => - val isa = sa.asInstanceOf[InetSocketAddress] - logInfo( -log"Spark Connect server started at: " + - log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") -} +val isa = SparkConnectService.bindingAddress +logInfo( + log"Spark Connect server started at: " + +log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") } catch { case e: Exception => logError("Error starting Spark Connect server", e) System.exit(-1) } SparkConnectService.server.awaitTermination() } finally { + if (SparkConnectService.started) { +SparkConnectService.stop() + } session.stop() - SparkConnectService.uiTab.foreach(_.detach()) Review Comment: @grundprinzip The SparkConnectService.stop() is used here, the uiTab's detaching contained in the stop(). ``` finally { if (SparkConnectService.started) { SparkConnectService.stop() } session.stop() } ``` In this PR, the SparkConnectService.start() method will post a Service Start event containing the address of the SparkConnectService, while the SparkConnectService.stop() method will post a Service End event. The purpose is to ensure that these two events occur in pairs for the Spark Listeners. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2082639091 One thing I'm wondering if it might work out of the box is the ability to specify an ephemeral port for the spark connect service and pick this up during startup. This might alleviate you from using the startup utils to find an open port and rely on the system to provide you with one. Since you don't care about the actual port, that might be good enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1583029459 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -344,35 +357,79 @@ object SparkConnectService extends Logging { private def startGRPCService(): Unit = { val debugMode = SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true) val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS) -val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) -val sb = bindAddress match { - case Some(hostname) => -logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") -NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) - case _ => NettyServerBuilder.forPort(port) +val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) +val sparkConnectService = new SparkConnectService(debugMode) +val protoReflectionService = if (debugMode) Some(ProtoReflectionService.newInstance()) else None +val configuredInterceptors = SparkConnectInterceptorRegistry.createConfiguredInterceptors() + +val startService = (port: Int) => { + val sb = bindAddress match { +case Some(hostname) => + logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") + NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) +case _ => NettyServerBuilder.forPort(port) + } + sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) +.addService(sparkConnectService) + + // Add all registered interceptors to the server builder. + SparkConnectInterceptorRegistry.chainInterceptors(sb, configuredInterceptors) + + // If debug mode is configured, load the ProtoReflection service so that tools like + // grpcurl can introspect the API for debugging. + protoReflectionService.foreach(service => sb.addService(service)) + + server = sb.build + server.start() + + // It will throw an IllegalStateException if you want to access the binding address + // while the server is in a terminated state, so record the actual binding address + // immediately after the server starts. + // There should only be one address, get the actual binding address + // of the server according the `server.port()` + bindingAddress = server.getListenSockets.asScala +.find(_.isInstanceOf[InetSocketAddress]) +.get +.asInstanceOf[InetSocketAddress] + + (server, server.getPort) } - sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) - .addService(new SparkConnectService(debugMode)) - -// Add all registered interceptors to the server builder. -SparkConnectInterceptorRegistry.chainInterceptors(sb) -// If debug mode is configured, load the ProtoReflection service so that tools like -// grpcurl can introspect the API for debugging. -if (debugMode) { - sb.addService(ProtoReflectionService.newInstance()) -} -server = sb.build -server.start() +val maxRetries: Int = SparkEnv.get.conf.get(CONNECT_GRPC_PORT_MAX_RETRIES) +Utils.startServiceOnPort[Server]( + startPort, + startService, + maxRetries, + getClass.getName +) } // Starts the service - def start(sc: SparkContext): Unit = { + def start(sc: SparkContext): Unit = synchronized { Review Comment: Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1582855870 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -344,35 +357,79 @@ object SparkConnectService extends Logging { private def startGRPCService(): Unit = { val debugMode = SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true) val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS) -val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) -val sb = bindAddress match { - case Some(hostname) => -logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") -NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) - case _ => NettyServerBuilder.forPort(port) +val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) +val sparkConnectService = new SparkConnectService(debugMode) +val protoReflectionService = if (debugMode) Some(ProtoReflectionService.newInstance()) else None +val configuredInterceptors = SparkConnectInterceptorRegistry.createConfiguredInterceptors() + +val startService = (port: Int) => { Review Comment: nit: I'd rename the variable to make it easier to follow that this is a closure and not a value. ```suggestion val startServiceFn = (port: Int) => { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1582854628 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -344,35 +357,79 @@ object SparkConnectService extends Logging { private def startGRPCService(): Unit = { val debugMode = SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true) val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS) -val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) -val sb = bindAddress match { - case Some(hostname) => -logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") -NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) - case _ => NettyServerBuilder.forPort(port) +val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) +val sparkConnectService = new SparkConnectService(debugMode) +val protoReflectionService = if (debugMode) Some(ProtoReflectionService.newInstance()) else None +val configuredInterceptors = SparkConnectInterceptorRegistry.createConfiguredInterceptors() + +val startService = (port: Int) => { + val sb = bindAddress match { +case Some(hostname) => + logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") + NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) +case _ => NettyServerBuilder.forPort(port) + } + sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) +.addService(sparkConnectService) + + // Add all registered interceptors to the server builder. + SparkConnectInterceptorRegistry.chainInterceptors(sb, configuredInterceptors) Review Comment: Why this change? I'm not sure I understand why the interceptor registry needs to be modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1582852496 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala: ## @@ -36,21 +32,21 @@ object SparkConnectServer extends Logging { try { try { SparkConnectService.start(session.sparkContext) -SparkConnectService.server.getListenSockets.asScala.foreach { sa => - val isa = sa.asInstanceOf[InetSocketAddress] - logInfo( -log"Spark Connect server started at: " + - log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") -} +val isa = SparkConnectService.bindingAddress Review Comment: High level, I think this change is fine, I'm mostly wondering if there is a case where we would have multiple listen sockets, e.g. listening on multiple devices? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1582848826 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2302,6 +2302,11 @@ class SparkContext(config: SparkConf) extends Logging { } _dagScheduler = null } +// In case there are still events being posted during the shutdown of plugins, +// invoke the shutdown of each plugin before the listenerBus is stopped. +Utils.tryLogNonFatalError { Review Comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on code in PR #46182: URL: https://github.com/apache/spark/pull/46182#discussion_r1582846934 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala: ## @@ -36,21 +32,21 @@ object SparkConnectServer extends Logging { try { try { SparkConnectService.start(session.sparkContext) -SparkConnectService.server.getListenSockets.asScala.foreach { sa => - val isa = sa.asInstanceOf[InetSocketAddress] - logInfo( -log"Spark Connect server started at: " + - log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") -} +val isa = SparkConnectService.bindingAddress +logInfo( + log"Spark Connect server started at: " + +log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") } catch { case e: Exception => logError("Error starting Spark Connect server", e) System.exit(-1) } SparkConnectService.server.awaitTermination() } finally { + if (SparkConnectService.started) { +SparkConnectService.stop() + } session.stop() - SparkConnectService.uiTab.foreach(_.detach()) Review Comment: why remove that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
grundprinzip commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2074921088 Hi @TakawaAkirayo , thanks a lot for the contribution. Please let me have a look at it. This is something we wanted to address, but I need to first spend some time looking at the PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo commented on PR #46182: URL: https://github.com/apache/spark/pull/46182#issuecomment-2074148793 cc @grundprinzip @HyukjinKwon Please let me know if this change aligns with the design and usage pattern of SparkConnect and also whether there are already related plans on the roadmap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]
TakawaAkirayo opened a new pull request, #46182: URL: https://github.com/apache/spark/pull/46182 ### What changes were proposed in this pull request? 1. Add configuration `spark.connect.grpc.port.maxRetries` (default 0, no retries). [Before this PR]: The SparkConnectService would fail to start in case of port conflicts on Yarn. [After this PR]: Allow the internal GRPC server to retry new ports until it finds an available port before reaching the maxRetries. 2. Post SparkListenerEvent containing the location of the remote SparkConnectService on Yarn. [Before this PR]: We needed to manually find the final location (host and port) of the SparkConnectService on Yarn and then use the SparkConnect Client to connect. [After this PR]: The location will be posted via SparkListenerEvent `SparkListenerConnectServiceStarted` `SparkListenerConnectServiceEnd` Allowing users to register a listener to receive this event and expose it by some way like sending it to a third coordinator server. 3. Shutdown SparkPlugins before stopping the ListenerBus. [Before this PR]: If the SparkConnectService was launched in the SparkConnectPlugin way, currently the SparkPlugins would be shutdown after the stop of ListenerBus, causing events posted during the shutdown to not be delivered to the listener. [After this PR]: The SparkPlugins will be shutdown before the stop of ListenerBus, ensuring that the ListenerBus remains active during the shutdown and the listener can receive the SparkConnectService stop event. 4. Minor method refactoring for 1~3. ### Why are the changes needed? #User Story: Our data analysts and data scientists use Jupyter notebooks provisioned on Kubernetes (k8s) with limited CPU/memory resources to run Spark-shell/pyspark for interactively development via terminal under Yarn Client mode. However, Yarn Client mode consumes significant local memory if the job is heavy, and the total resource pool of k8s for notebooks is limited. To leverage the abundant resources of our Hadoop cluster for scalability purposes, we aim to utilize SparkConnect. This allows the driver on Yarn with SparkConnectService started and uses SparkConnect client to connect to the remote driver. To provide a seamless experience with one command startup for both server and client, we've wrapped the following processes in one script: 1) Start a local coordinator server (implemented by us internally, not in this PR) in the host of jupyter notebook. 2) Start SparkConnectServer by spark-submit via Yarn Cluster mode with user-input Spark configurations and the local coordinator server's address and port. Append an additional listener class in the configuration for SparkConnectService callback with the actual address and port on Yarn to the coordinator server. 3) Wait for the coordinator server to receive the address callback from the SparkConnectService on Yarn and export the real address. 4) Start the client (pyspark --remote $callback_address) with the remote address. Finally, a remote SparkConnect Server is started on Yarn with a local SparkConnect client connected. Users no longer need to start the server beforehand and connect to the remote server after they manually explore the address on Yarn. #Problem statement of this change: 1) The specified port for the SparkConnectService GRPC server might be occupied on the node of the Hadoop Cluster. To increase the success rate of startup, it needs to retry on conflicts rather than fail directly. 2) Because the final binding port could be uncertain based on 1) when retry and the remote address is also unpredictable on Yarn, we need to retrieve the address and port programmatically and inject it automatically on the start of 'pyspark --remote'. To get the address of SparkConnectService on Yarn programmatically, the SparkConnectService needs to communicate its location back to the launcher side. ### Does this PR introduce _any_ user-facing change? 1. Add configuration `spark.connect.grpc.port.maxRetries` to enable port retries until an available port is found before reaching the maximum number of retries. 3. The start and stop events of the SparkConnectService are observable through the SparkListener. Two new events have been introduced: - SparkListenerConnectServiceStarted: the SparkConnectService(with address and port) tis online for serving - SparkListenerConnectServiceEnd: the SparkConnectService(with address and port) is offline ### How was this patch tested? By UT and verified the feature in our production environment by our binary build ### Was this patch authored or co-authored using generative AI tooling? No -- This i