[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r250824395 ## File path: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ## @@ -87,6 +87,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite { testBasicYarnApp(false) } + test("run Spark in yarn-client mode with unmanaged am") { +testBasicYarnApp(true, Map("spark.yarn.unmanagedAM.enabled" -> "true")) Review comment: `YARN_UNMANAGED_AM.key` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r250824125 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -298,6 +281,60 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends exitCode } + def runUnmanaged( + clientRpcEnv: RpcEnv, + appAttemptId: ApplicationAttemptId, + stagingDir: Path, + cachedResourcesConf: SparkConf): Unit = { +try { + new CallerContext( +"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), +Option(appAttemptId.getApplicationId.toString), None).setCurrentContext() + + val driverRef = clientRpcEnv.setupEndpointRef( +RpcAddress(sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port").toInt), +YarnSchedulerBackend.ENDPOINT_NAME) + // The client-mode AM doesn't listen for incoming connections, so report an invalid port. + registerAM(Utils.localHostName, -1, sparkConf, +sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId) + addAmIpFilter(Some(driverRef), ProxyUriUtils.getPath(appAttemptId.getApplicationId)) + createAllocator(driverRef, sparkConf, clientRpcEnv, appAttemptId, cachedResourcesConf) + reporterThread.join() +} catch { + case e: Exception => +// catch everything else if not specifically handled +logError("Uncaught exception: ", e) +finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, + "Uncaught exception: " + StringUtils.stringifyException(e)) +if (!unregistered) { Review comment: Is this code needed here? Won't it be called when the client calls `stopUnmanaged`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r248891914 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ## @@ -236,6 +236,14 @@ package object config { .stringConf .createOptional + /* Unmanaged AM configuration. */ + + private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM") Review comment: Add `.enabled` to the config key. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r248862598 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -70,6 +73,11 @@ private[spark] class Client( private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode + private var amServiceStarted = false Review comment: Do you need this extra flag? Could you just check if `appMaster != null`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r248861935 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -744,7 +795,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends override def onDisconnected(remoteAddress: RpcAddress): Unit = { // In cluster mode, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails - if (!isClusterMode) { + if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) { Review comment: Update comment above? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r248862990 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -70,6 +73,11 @@ private[spark] class Client( private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode + private var amServiceStarted = false + private var appMaster: ApplicationMaster = _ + private var unManagedAMStagingDirPath: Path = _ Review comment: Seems better to just store this in a variable for all cases. It's recomputed from the conf in a few different places in this class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r243433749 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -324,6 +308,59 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } + def runUnmanaged(clientRpcEnv: RpcEnv, + appAttemptId: ApplicationAttemptId, + stagingDir: Path): Unit = { +try { + new CallerContext( +"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), +Option(appAttemptId.getApplicationId.toString), None).setCurrentContext() + + // This shutdown hook should run *after* the SparkContext is shut down. Review comment: This is client mode, so you can't rely on shutdown hooks. You need to explicitly stop this service when the SparkContext is shutdown. Imagine someone just embeds `sc = new SparkContext(); ...; sc.stop()` in their app code, but the app itself runs for way longer than the Spark app. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r243433843 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -324,6 +308,59 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } + def runUnmanaged(clientRpcEnv: RpcEnv, Review comment: Multi-line args start on the next line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r243435612 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -1098,14 +1106,41 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return createAppReport(report) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled && + !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { Review comment: `: Unit = ` But given you should be explicitly stopping the AM, this should probably return the AM itself. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242714302 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -324,6 +311,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } + def runUnmanaged(clientRpcEnv: RpcEnv): Unit = { +runImpl { + val driverRef = clientRpcEnv.setupEndpointRef( +RpcAddress(sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port").toInt), +YarnSchedulerBackend.ENDPOINT_NAME) + // The client-mode AM doesn't listen for incoming connections, so report an invalid port. + registerAM( +Utils.localHostName, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress")) + addAmIpFilter(Some(driverRef)) + createAllocator(driverRef, sparkConf, clientRpcEnv) + + // In client mode the actor will stop the reporter thread. Review comment: actor? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242715751 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -297,7 +303,10 @@ private[spark] class Client( "does not support it", e) } } - +if (isClientUnmanagedAMEnabled) { + // Set Unmanaged AM to true in Application Submission Context + appContext.setUnmanagedAM(true) Review comment: `appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)` Which also makes the comment unnecessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242717220 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -1098,14 +1109,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return createAppReport(report) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier]( +token.getIdentifier().array(), token.getPassword().array, +new Text(token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +sparkConf.set("spark.yarn.containerId", + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) Review comment: Won't this name be the same as the first executor created by the app? I'd rather special-case `getContainerId` to return some baked-in string when the env variable is not set. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242714886 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -617,7 +625,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) if (!preserveFiles) { -stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) +var stagingDir = System.getenv("SPARK_YARN_STAGING_DIR") Review comment: `val stagingDir = sys.props.get("...").getOrElse { ... }` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242715964 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -1098,14 +1109,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return createAppReport(report) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { Review comment: indent one more level This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242716228 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -1098,14 +1109,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return createAppReport(report) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier]( +token.getIdentifier().array(), token.getPassword().array, +new Text(token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +sparkConf.set("spark.yarn.containerId", + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) +// Start Application Service in a separate thread and continue with application monitoring +val amService = new Thread("Unmanaged Application Master Service") { + override def run(): Unit = new ApplicationMaster(new ApplicationMasterArguments(Array.empty), Review comment: This is a pretty long line. Break it down. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242715378 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -617,7 +625,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) if (!preserveFiles) { -stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) +var stagingDir = System.getenv("SPARK_YARN_STAGING_DIR") +if (stagingDir == null) { + val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) } Review comment: This looks similar to the logic in `Client.scala`. Maybe the value calculated there should be plumbed through, instead of adding this code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r242714148 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -234,20 +218,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends resources.toMap } - def getAttemptId(): ApplicationAttemptId = { -client.getAttemptId() + def getAttemptId(sparkConf: SparkConf): ApplicationAttemptId = { +client.getAttemptId(sparkConf) } final def run(): Int = { doAsUser { - runImpl() + runImpl( +if (isClusterMode) { + runDriver() +} else { + runExecutorLauncher() +} + ) } exitCode } - private def runImpl(): Unit = { + private def runImpl(opBlock: => Unit): Unit = { Review comment: There are things in this method that don't look right when you think about an unmanaged AM. e.g., overriding `spark.master`, `spark.ui.port`, etc, look wrong. The handling of app attempts also seems wrong, since with an unmanaged AM you don't have multiple attempts. Even the shutdown hooks seems a bit out of place. Seems to me it would be easier not to try to use this method for the unmanaged AM. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r240421106 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -1084,14 +1095,38 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return createAppReport(report) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token +.getIdentifier().array(), token.getPassword().array, new Text( +token.getKind()), new Text(token.getService())) +val currentUGI = UserGroupInformation.getCurrentUser +currentUGI.addToken(amRMToken) + +sparkConf.set("spark.yarn.containerId", + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) +// Start Application Service in a separate thread and continue with application monitoring +val amService = new Thread() { Review comment: Thread name? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r240419661 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -656,7 +664,9 @@ private[spark] class Client( // Clear the cache-related entries from the configuration to avoid them polluting the // UI's environment page. This works for client mode; for cluster mode, this is handled // by the AM. -CACHE_CONFIGS.foreach(sparkConf.remove) +if (!isClientUnmanagedAMEnabled) { Review comment: I think this is happening because you're starting the AM after these are removed from the conf. Should probably juggle things around or change how these are provided to the AM, since these configs are super noisy and shouldn't really show up in the UI. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r240421285 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ## @@ -1084,14 +1095,38 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return createAppReport(report) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled +&& !amServiceStarted && report.getAMRMToken != null) { +amServiceStarted = true +startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { +// Add AMRMToken to establish connection between RM and AM +val token = report.getAMRMToken +val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token Review comment: Keep related calls in the same like (e.g. `token.getIdentifier()`, `new Text(blah)`) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r240419819 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ## @@ -183,8 +183,13 @@ object YarnSparkHadoopUtil { ) } - def getContainerId: ContainerId = { -val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + def getContainerId(sparkConf: SparkConf): ContainerId = { + val containerIdString = + if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) { Review comment: better to use `sparkConf.getenv`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r240419719 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ## @@ -183,8 +183,13 @@ object YarnSparkHadoopUtil { ) } - def getContainerId: ContainerId = { -val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + def getContainerId(sparkConf: SparkConf): ContainerId = { + val containerIdString = Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r240421526 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -51,32 +52,27 @@ import org.apache.spark.util._ /** * Common application master functionality for Spark on Yarn. */ -private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging { +private[spark] class ApplicationMaster( +val args: ApplicationMasterArguments, +val sparkConf: SparkConf, +val yarnConf: YarnConfiguration) + extends Logging { + def this(sparkConf: SparkConf, Review comment: See above constructor for multi-line args style. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode URL: https://github.com/apache/spark/pull/19616#discussion_r240422378 ## File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ## @@ -481,20 +478,29 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } private def runExecutorLauncher(): Unit = { -val hostname = Utils.localHostName -val amCores = sparkConf.get(AM_CORES) -rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr, - amCores, true) - -// The client-mode AM doesn't listen for incoming connections, so report an invalid port. -registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress")) - -// The driver should be up and listening, so unlike cluster mode, just try to connect to it -// with no waiting or retrying. -val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0)) -val driverRef = rpcEnv.setupEndpointRef( - RpcAddress(driverHost, driverPort), - YarnSchedulerBackend.ENDPOINT_NAME) +var driverRef : RpcEndpointRef = null +if (sparkConf.get(YARN_UNMANAGED_AM)) { Review comment: I'm not a big fan of this change. Feels like you should have a different method here called `runUnmanaged` that is called instead of `run()`, and takes an `RpcEnv`. That way you don't need to keep `clientRpcEnv` at all since it would be local to that method, since nothing else here needs it. In fact even `rpcEnv` could go away and become a parameter to `createAllocator`... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org