Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19616#discussion_r165518845 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1104,14 +1117,39 @@ private[spark] class Client( if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } - + if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled + && !amServiceStarted && report.getAMRMToken != null) { + amServiceStarted = true + startApplicationMasterService(report) + } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") } + private def startApplicationMasterService(report: ApplicationReport) = { + // Add AMRMToken to establish connection between RM and AM + val token = report.getAMRMToken + val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] = + new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token + .getIdentifier().array(), token.getPassword().array, new Text( + token.getKind()), new Text(token.getService())) + val currentUGI = UserGroupInformation.getCurrentUser + currentUGI.addToken(amRMToken) + + System.setProperty( + ApplicationConstants.Environment.CONTAINER_ID.name(), + ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString) + val amArgs = new ApplicationMasterArguments(Array("--arg", + sparkConf.get("spark.driver.host") + ":" + sparkConf.get("spark.driver.port"))) + // Start Application Service in a separate thread and continue with application monitoring + new Thread() { --- End diff -- Don't you want to keep a reference to this thread and join it at some point, to make sure it really goes away? Should it be a daemon thread instead?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org