Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-07-02 Thread via GitHub


jshmchenxi commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2194649550

   @TakawaAkirayo That's good to know. Thanks for sharing!
   
   Side question, how do you manage the termination of Spark Connect servers? 
Users of JupyterHub usually restart or stop a kernel, would that lead to leaks 
of the connect server?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-06-06 Thread via GitHub


HyukjinKwon closed pull request #46182: [SPARK-47952][CORE][CONNECT] Support 
retrieving the real SparkConnectService GRPC address and port programmatically 
when running on Yarn
URL: https://github.com/apache/spark/pull/46182


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-06-06 Thread via GitHub


HyukjinKwon commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2153602043

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-06-06 Thread via GitHub


TakawaAkirayo commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2152575211

   Thanks for your review! @grundprinzip @HyukjinKwon, after rebase and fix, 
the failure is resolved now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-06-02 Thread via GitHub


HyukjinKwon commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2144252644

   Let's fix up the CI failure. Otherwise, looks good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-05-16 Thread via GitHub


TakawaAkirayo commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2116573625

   Gently ping @grundprinzip if anything else needs to be provided from my side 
:)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


TakawaAkirayo commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2084451692

   > One thing I'm wondering if it might work out of the box is the ability to 
specify an ephemeral port for the spark connect service and pick this up during 
startup.
   > 
   > This might alleviate you from using the startup utils to find an open port 
and rely on the system to provide you with one. Since you don't care about the 
actual port, that might be good enough.
   
   @grundprinzip I think your idea could definitely work if there are no port 
limitations. 
   
   However, one issue we're currently facing internally is that the connection 
between the client side and the server side is restricted on any port due to 
internal software firewalls between the Hadoop cluster and provisioned 
notebooks. The server could pick up a port that not allowed by the firewall. 
   So we have to request a whitelist for a range of ports (One time request for 
a pool that provisioned on k8s, can not request a big range of ports like 0 ~ 
65535). 
   
   Therefore, using retry with monotone increasing port numbers can help limit 
the port selection within the expected port range.
   
   Not sure if our case is a corner case, please share your thoughts, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


TakawaAkirayo commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1584077843


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala:
##
@@ -36,21 +32,21 @@ object SparkConnectServer extends Logging {
 try {
   try {
 SparkConnectService.start(session.sparkContext)
-SparkConnectService.server.getListenSockets.asScala.foreach { sa =>
-  val isa = sa.asInstanceOf[InetSocketAddress]
-  logInfo(
-log"Spark Connect server started at: " +
-  log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, 
isa.getPort)}")
-}
+val isa = SparkConnectService.bindingAddress

Review Comment:
   > High level, I think this change is fine, I'm mostly wondering if there is 
a case where we would have multiple listen sockets, e.g. listening on multiple 
devices?
   
   @grundprinzip Yes, if we want to listen on all of the network interfaces in 
a machine to accept connections from any network, then we can add multiple 
addresses to let the Server listen on.
   
   For SparkConnectService, the NettyServer is built with single address, so 
the getListenSockets it should only have one item



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


TakawaAkirayo commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1584064102


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -344,35 +357,79 @@ object SparkConnectService extends Logging {
   private def startGRPCService(): Unit = {
 val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
 val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
-val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
-val sb = bindAddress match {
-  case Some(hostname) =>
-logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
-NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
-  case _ => NettyServerBuilder.forPort(port)
+val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
+val sparkConnectService = new SparkConnectService(debugMode)
+val protoReflectionService = if (debugMode) 
Some(ProtoReflectionService.newInstance()) else None
+val configuredInterceptors = 
SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+
+val startService = (port: Int) => {
+  val sb = bindAddress match {
+case Some(hostname) =>
+  logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
+  NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
+case _ => NettyServerBuilder.forPort(port)
+  }
+  
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+.addService(sparkConnectService)
+
+  // Add all registered interceptors to the server builder.
+  SparkConnectInterceptorRegistry.chainInterceptors(sb, 
configuredInterceptors)

Review Comment:
   > Why this change? I'm not sure I understand why the interceptor registry 
needs to be modified.
   
   @grundprinzip Before this change, the 
SparkConnectInterceptorRegistry.chainInterceptors() would internally invoke 
createConfiguredInterceptors(). Therefore, if 
SparkConnectInterceptorRegistry.chainInterceptors() was placed within the code 
block (startServiceFn), it could be retried multiple times, resulting in the 
creation of configured interceptors multiple times.
   
   After this change, the createConfiguredInterceptors() method will only be 
invoked once if there are retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


TakawaAkirayo commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1584047060


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala:
##
@@ -36,21 +32,21 @@ object SparkConnectServer extends Logging {
 try {
   try {
 SparkConnectService.start(session.sparkContext)
-SparkConnectService.server.getListenSockets.asScala.foreach { sa =>
-  val isa = sa.asInstanceOf[InetSocketAddress]
-  logInfo(
-log"Spark Connect server started at: " +
-  log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, 
isa.getPort)}")
-}
+val isa = SparkConnectService.bindingAddress
+logInfo(
+  log"Spark Connect server started at: " +
+log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, 
isa.getPort)}")
   } catch {
 case e: Exception =>
   logError("Error starting Spark Connect server", e)
   System.exit(-1)
   }
   SparkConnectService.server.awaitTermination()
 } finally {
+  if (SparkConnectService.started) {
+SparkConnectService.stop()
+  }
   session.stop()
-  SparkConnectService.uiTab.foreach(_.detach())

Review Comment:
   @grundprinzip The SparkConnectService.stop() is used here, the uiTab's 
detaching contained in the stop().
   
   ```
   finally {
 if (SparkConnectService.started) {
   SparkConnectService.stop()
 }
 session.stop()
   }
   ```
   
   In this PR, the SparkConnectService.start() method will post a Service Start 
event containing the address of the SparkConnectService, while the 
SparkConnectService.stop() method will post a Service End event. The purpose is 
to ensure that these two events occur in pairs for the Spark Listeners.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


grundprinzip commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2082639091

   One thing I'm wondering if it might work out of the box is the ability to 
specify an ephemeral port for the spark connect service and pick this up during 
startup.
   
   This might alleviate you from using the startup utils to find an open port 
and rely on the system to provide you with one. Since you don't care about the 
actual port, that might be good enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


grundprinzip commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1583029459


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -344,35 +357,79 @@ object SparkConnectService extends Logging {
   private def startGRPCService(): Unit = {
 val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
 val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
-val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
-val sb = bindAddress match {
-  case Some(hostname) =>
-logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
-NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
-  case _ => NettyServerBuilder.forPort(port)
+val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
+val sparkConnectService = new SparkConnectService(debugMode)
+val protoReflectionService = if (debugMode) 
Some(ProtoReflectionService.newInstance()) else None
+val configuredInterceptors = 
SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+
+val startService = (port: Int) => {
+  val sb = bindAddress match {
+case Some(hostname) =>
+  logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
+  NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
+case _ => NettyServerBuilder.forPort(port)
+  }
+  
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+.addService(sparkConnectService)
+
+  // Add all registered interceptors to the server builder.
+  SparkConnectInterceptorRegistry.chainInterceptors(sb, 
configuredInterceptors)
+
+  // If debug mode is configured, load the ProtoReflection service so that 
tools like
+  // grpcurl can introspect the API for debugging.
+  protoReflectionService.foreach(service => sb.addService(service))
+
+  server = sb.build
+  server.start()
+
+  // It will throw an IllegalStateException if you want to access the 
binding address
+  // while the server is in a terminated state, so record the actual 
binding address
+  // immediately after the server starts.
+  // There should only be one address, get the actual binding address
+  // of the server according the `server.port()`
+  bindingAddress = server.getListenSockets.asScala
+.find(_.isInstanceOf[InetSocketAddress])
+.get
+.asInstanceOf[InetSocketAddress]
+
+  (server, server.getPort)
 }
-
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
-  .addService(new SparkConnectService(debugMode))
-
-// Add all registered interceptors to the server builder.
-SparkConnectInterceptorRegistry.chainInterceptors(sb)
 
-// If debug mode is configured, load the ProtoReflection service so that 
tools like
-// grpcurl can introspect the API for debugging.
-if (debugMode) {
-  sb.addService(ProtoReflectionService.newInstance())
-}
-server = sb.build
-server.start()
+val maxRetries: Int = SparkEnv.get.conf.get(CONNECT_GRPC_PORT_MAX_RETRIES)
+Utils.startServiceOnPort[Server](
+  startPort,
+  startService,
+  maxRetries,
+  getClass.getName
+)
   }
 
   // Starts the service
-  def start(sc: SparkContext): Unit = {
+  def start(sc: SparkContext): Unit = synchronized {

Review Comment:
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


grundprinzip commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1582855870


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -344,35 +357,79 @@ object SparkConnectService extends Logging {
   private def startGRPCService(): Unit = {
 val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
 val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
-val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
-val sb = bindAddress match {
-  case Some(hostname) =>
-logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
-NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
-  case _ => NettyServerBuilder.forPort(port)
+val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
+val sparkConnectService = new SparkConnectService(debugMode)
+val protoReflectionService = if (debugMode) 
Some(ProtoReflectionService.newInstance()) else None
+val configuredInterceptors = 
SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+
+val startService = (port: Int) => {

Review Comment:
   nit: I'd rename the variable to make it easier to follow that this is a 
closure and not a value.
   
   ```suggestion
   val startServiceFn = (port: Int) => {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


grundprinzip commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1582854628


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -344,35 +357,79 @@ object SparkConnectService extends Logging {
   private def startGRPCService(): Unit = {
 val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
 val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
-val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
-val sb = bindAddress match {
-  case Some(hostname) =>
-logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
-NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
-  case _ => NettyServerBuilder.forPort(port)
+val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
+val sparkConnectService = new SparkConnectService(debugMode)
+val protoReflectionService = if (debugMode) 
Some(ProtoReflectionService.newInstance()) else None
+val configuredInterceptors = 
SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+
+val startService = (port: Int) => {
+  val sb = bindAddress match {
+case Some(hostname) =>
+  logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
+  NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
+case _ => NettyServerBuilder.forPort(port)
+  }
+  
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+.addService(sparkConnectService)
+
+  // Add all registered interceptors to the server builder.
+  SparkConnectInterceptorRegistry.chainInterceptors(sb, 
configuredInterceptors)

Review Comment:
   Why this change? I'm not sure I understand why the interceptor registry 
needs to be modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


grundprinzip commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1582852496


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala:
##
@@ -36,21 +32,21 @@ object SparkConnectServer extends Logging {
 try {
   try {
 SparkConnectService.start(session.sparkContext)
-SparkConnectService.server.getListenSockets.asScala.foreach { sa =>
-  val isa = sa.asInstanceOf[InetSocketAddress]
-  logInfo(
-log"Spark Connect server started at: " +
-  log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, 
isa.getPort)}")
-}
+val isa = SparkConnectService.bindingAddress

Review Comment:
   High level, I think this change is fine, I'm mostly wondering if there is a 
case where we would have multiple listen sockets, e.g. listening on multiple 
devices?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


grundprinzip commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1582848826


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -2302,6 +2302,11 @@ class SparkContext(config: SparkConf) extends Logging {
   }
   _dagScheduler = null
 }
+// In case there are still events being posted during the shutdown of 
plugins,
+// invoke the shutdown of each plugin before the listenerBus is stopped.
+Utils.tryLogNonFatalError {

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-29 Thread via GitHub


grundprinzip commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1582846934


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala:
##
@@ -36,21 +32,21 @@ object SparkConnectServer extends Logging {
 try {
   try {
 SparkConnectService.start(session.sparkContext)
-SparkConnectService.server.getListenSockets.asScala.foreach { sa =>
-  val isa = sa.asInstanceOf[InetSocketAddress]
-  logInfo(
-log"Spark Connect server started at: " +
-  log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, 
isa.getPort)}")
-}
+val isa = SparkConnectService.bindingAddress
+logInfo(
+  log"Spark Connect server started at: " +
+log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, 
isa.getPort)}")
   } catch {
 case e: Exception =>
   logError("Error starting Spark Connect server", e)
   System.exit(-1)
   }
   SparkConnectService.server.awaitTermination()
 } finally {
+  if (SparkConnectService.started) {
+SparkConnectService.stop()
+  }
   session.stop()
-  SparkConnectService.uiTab.foreach(_.detach())

Review Comment:
   why remove that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-24 Thread via GitHub


grundprinzip commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2074921088

   Hi @TakawaAkirayo , thanks a lot for the contribution. Please let me have a 
look at it. This is something we wanted to address, but I need to first spend 
some time looking at the PR :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-23 Thread via GitHub


TakawaAkirayo commented on PR #46182:
URL: https://github.com/apache/spark/pull/46182#issuecomment-2074148793

   cc @grundprinzip @HyukjinKwon Please let me know if this change aligns with 
the design and usage pattern of SparkConnect and also whether there are already 
related plans on the roadmap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47952][CORE][CONNECT] Support retrieving the real SparkConnectService GRPC address and port programmatically when running on Yarn [spark]

2024-04-23 Thread via GitHub


TakawaAkirayo opened a new pull request, #46182:
URL: https://github.com/apache/spark/pull/46182

   
   
   ### What changes were proposed in this pull request?
   1. Add configuration `spark.connect.grpc.port.maxRetries` (default 0, no 
retries).
   
  [Before this PR]: The SparkConnectService would fail to start in case of 
port conflicts on Yarn.
  [After this PR]: Allow the internal GRPC server to retry new ports until 
it finds an available port before reaching the maxRetries.
   
   2. Post SparkListenerEvent containing the location of the remote 
SparkConnectService on Yarn.
   
  [Before this PR]: We needed to manually find the final location (host and 
port) of the SparkConnectService on Yarn and then use the SparkConnect Client 
to connect.
  [After this PR]: The location will be posted via SparkListenerEvent
 
`SparkListenerConnectServiceStarted`
 
`SparkListenerConnectServiceEnd`
Allowing users to register a listener to receive this event and expose it 
by some way like sending it to a third coordinator server.
   
   3. Shutdown SparkPlugins before stopping the ListenerBus.
   
  [Before this PR]: If the SparkConnectService was launched in the 
SparkConnectPlugin way, currently the SparkPlugins would be shutdown after the 
stop of ListenerBus, causing events posted during the shutdown to not be 
delivered to the listener.
  [After this PR]: The SparkPlugins will be shutdown before the stop of 
ListenerBus, ensuring that the ListenerBus remains active during the shutdown 
and the listener can receive the SparkConnectService stop event.
   
   4. Minor method refactoring for 1~3.
   
   
   ### Why are the changes needed?
   #User Story:
   Our data analysts and data scientists use Jupyter notebooks provisioned on 
Kubernetes (k8s) with limited CPU/memory resources to run Spark-shell/pyspark 
for interactively development via terminal under Yarn Client mode.
   
   However, Yarn Client mode consumes significant local memory if the job is 
heavy, and the total resource pool of k8s for notebooks is limited.
   
   To leverage the abundant resources of our Hadoop cluster for scalability 
purposes, we aim to utilize SparkConnect.
   This allows the driver on Yarn with SparkConnectService started and uses 
SparkConnect client to connect to the remote driver.
   
   To provide a seamless experience with one command startup for both server 
and client, we've wrapped the following processes in one script:
   
   1) Start a local coordinator server (implemented by us internally, not in 
this PR) in the host of jupyter notebook.
   2) Start SparkConnectServer by spark-submit via Yarn Cluster mode with 
user-input Spark configurations and the local coordinator server's address and 
port.
  Append an additional listener class in the configuration for 
SparkConnectService callback with the actual address and port on Yarn to the 
coordinator server.
   3) Wait for the coordinator server to receive the address callback from the 
SparkConnectService on Yarn and export the real address.
   4) Start the client (pyspark --remote $callback_address) with the remote 
address.
   
   Finally, a remote SparkConnect Server is started on Yarn with a local 
SparkConnect client connected. Users no longer need to start the server 
beforehand and connect to the remote server after they manually explore the 
address on Yarn.
   
   #Problem statement of this change:
   1) The specified port for the SparkConnectService GRPC server might be 
occupied on the node of the Hadoop Cluster.
  To increase the success rate of startup, it needs to retry on conflicts 
rather than fail directly.
   2) Because the final binding port could be uncertain based on 1) when retry 
and the remote address is also unpredictable on Yarn, we need to retrieve the 
address and port programmatically and inject it automatically on the start of 
'pyspark --remote'. To get the address of SparkConnectService on Yarn 
programmatically, the SparkConnectService needs to communicate its location 
back to the launcher side.
   
   
   ### Does this PR introduce _any_ user-facing change?
   1. Add configuration `spark.connect.grpc.port.maxRetries` to enable port 
retries until an available port is found before reaching the maximum number of 
retries.
   
   3. The start and stop events of the SparkConnectService are observable 
through the SparkListener.
  Two new events have been introduced:
  - SparkListenerConnectServiceStarted: the SparkConnectService(with 
address and port) tis online for serving
  - SparkListenerConnectServiceEnd: the SparkConnectService(with address 
and port) is offline
   
   ### How was this patch tested?
   By UT and verified the feature in our production environment by our binary 
build
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This i