Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15009#discussion_r99850931
  
    --- Diff: 
core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala ---
    @@ -17,38 +17,67 @@
     
     package org.apache.spark.launcher
     
    +import java.io.IOException
     import java.net.{InetAddress, Socket}
     
     import org.apache.spark.SPARK_VERSION
    +import org.apache.spark.internal.Logging
     import org.apache.spark.launcher.LauncherProtocol._
    -import org.apache.spark.util.{ThreadUtils, Utils}
    +import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
     
     /**
      * A class that can be used to talk to a launcher server. Users should 
extend this class to
      * provide implementation for the abstract methods.
      *
      * See `LauncherServer` for an explanation of how launcher communication 
works.
      */
    -private[spark] abstract class LauncherBackend {
    +private[spark] abstract class LauncherBackend extends Logging {
     
       private var clientThread: Thread = _
       private var connection: BackendConnection = _
       private var lastState: SparkAppHandle.State = _
    +  private var stopOnShutdown: Boolean = false
       @volatile private var _isConnected = false
     
       def connect(): Unit = {
         val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
         val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
    +    val stopFlag = 
sys.env.get(LauncherProtocol.ENV_LAUNCHER_STOP_FLAG).map(_.toBoolean)
         if (port != None && secret != None) {
    -      val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
    -      connection = new BackendConnection(s)
    -      connection.send(new Hello(secret.get, SPARK_VERSION))
    -      clientThread = LauncherBackend.threadFactory.newThread(connection)
    -      clientThread.start()
    -      _isConnected = true
    +      connect(port.get, secret.get, stopFlag.getOrElse(false))
         }
       }
     
    +  def connect(port: Int, secret: String): Unit = {
    +    val s = new Socket(InetAddress.getLoopbackAddress(), port)
    +    connection = new BackendConnection(s)
    +    connection.send(new Hello(secret, SPARK_VERSION))
    +    clientThread = LauncherBackend.threadFactory.newThread(connection)
    +    clientThread.start()
    +    _isConnected = true
    +    if (stopOnShutdown) {
    +      logDebug("Adding shutdown hook") // force eager creation of logger
    +      var _shutdownHookRef = ShutdownHookManager.addShutdownHook(
    +        ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
    +        logInfo("Invoking onStopRequest() from shutdown hook")
    +        try {
    +          if (_isConnected && stopOnShutdown) {
    +            onStopRequest()
    +          }
    +        }
    +        catch {
    +          case anotherIOE: IOException =>
    +            logError("Error while running LauncherBackend 
shutdownHook...", anotherIOE)
    +        }
    +      }
    +    }
    +  }
    +
    +  def connect(port: Int, secret: String, stopFlag: Boolean): Unit = {
    --- End diff --
    
    This addressed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to