Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/spark/pull/15009#discussion_r99850931 --- Diff: core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala --- @@ -17,38 +17,67 @@ package org.apache.spark.launcher +import java.io.IOException import java.net.{InetAddress, Socket} import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging import org.apache.spark.launcher.LauncherProtocol._ -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} /** * A class that can be used to talk to a launcher server. Users should extend this class to * provide implementation for the abstract methods. * * See `LauncherServer` for an explanation of how launcher communication works. */ -private[spark] abstract class LauncherBackend { +private[spark] abstract class LauncherBackend extends Logging { private var clientThread: Thread = _ private var connection: BackendConnection = _ private var lastState: SparkAppHandle.State = _ + private var stopOnShutdown: Boolean = false @volatile private var _isConnected = false def connect(): Unit = { val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt) val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET) + val stopFlag = sys.env.get(LauncherProtocol.ENV_LAUNCHER_STOP_FLAG).map(_.toBoolean) if (port != None && secret != None) { - val s = new Socket(InetAddress.getLoopbackAddress(), port.get) - connection = new BackendConnection(s) - connection.send(new Hello(secret.get, SPARK_VERSION)) - clientThread = LauncherBackend.threadFactory.newThread(connection) - clientThread.start() - _isConnected = true + connect(port.get, secret.get, stopFlag.getOrElse(false)) } } + def connect(port: Int, secret: String): Unit = { + val s = new Socket(InetAddress.getLoopbackAddress(), port) + connection = new BackendConnection(s) + connection.send(new Hello(secret, SPARK_VERSION)) + clientThread = LauncherBackend.threadFactory.newThread(connection) + clientThread.start() + _isConnected = true + if (stopOnShutdown) { + logDebug("Adding shutdown hook") // force eager creation of logger + var _shutdownHookRef = ShutdownHookManager.addShutdownHook( + ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => + logInfo("Invoking onStopRequest() from shutdown hook") + try { + if (_isConnected && stopOnShutdown) { + onStopRequest() + } + } + catch { + case anotherIOE: IOException => + logError("Error while running LauncherBackend shutdownHook...", anotherIOE) + } + } + } + } + + def connect(port: Int, secret: String, stopFlag: Boolean): Unit = { --- End diff -- This addressed now.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org