[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2019-01-24 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r250824395
 
 

 ##
 File path: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 ##
 @@ -87,6 +87,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
 testBasicYarnApp(false)
   }
 
+  test("run Spark in yarn-client mode with unmanaged am") {
+testBasicYarnApp(true, Map("spark.yarn.unmanagedAM.enabled" -> "true"))
 
 Review comment:
   `YARN_UNMANAGED_AM.key`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2019-01-24 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r250824125
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -298,6 +281,60 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 exitCode
   }
 
+  def runUnmanaged(
+  clientRpcEnv: RpcEnv,
+  appAttemptId: ApplicationAttemptId,
+  stagingDir: Path,
+  cachedResourcesConf: SparkConf): Unit = {
+try {
+  new CallerContext(
+"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
+Option(appAttemptId.getApplicationId.toString), 
None).setCurrentContext()
+
+  val driverRef = clientRpcEnv.setupEndpointRef(
+RpcAddress(sparkConf.get("spark.driver.host"),
+  sparkConf.get("spark.driver.port").toInt),
+YarnSchedulerBackend.ENDPOINT_NAME)
+  // The client-mode AM doesn't listen for incoming connections, so report 
an invalid port.
+  registerAM(Utils.localHostName, -1, sparkConf,
+sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId)
+  addAmIpFilter(Some(driverRef), 
ProxyUriUtils.getPath(appAttemptId.getApplicationId))
+  createAllocator(driverRef, sparkConf, clientRpcEnv, appAttemptId, 
cachedResourcesConf)
+  reporterThread.join()
+} catch {
+  case e: Exception =>
+// catch everything else if not specifically handled
+logError("Uncaught exception: ", e)
+finish(FinalApplicationStatus.FAILED,
+  ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+  "Uncaught exception: " + StringUtils.stringifyException(e))
+if (!unregistered) {
 
 Review comment:
   Is this code needed here? Won't it be called when the client calls 
`stopUnmanaged`?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2019-01-17 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r248891914
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 ##
 @@ -236,6 +236,14 @@ package object config {
   .stringConf
   .createOptional
 
+  /* Unmanaged AM configuration. */
+
+  private[spark] val YARN_UNMANAGED_AM = 
ConfigBuilder("spark.yarn.unmanagedAM")
 
 Review comment:
   Add `.enabled` to the config key.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2019-01-17 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r248862598
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -70,6 +73,11 @@ private[spark] class Client(
 
   private val isClusterMode = sparkConf.get("spark.submit.deployMode", 
"client") == "cluster"
 
+  private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && 
!isClusterMode
+  private var amServiceStarted = false
 
 Review comment:
   Do you need this extra flag? Could you just check if `appMaster != null`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2019-01-17 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r248861935
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -744,7 +795,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 override def onDisconnected(remoteAddress: RpcAddress): Unit = {
   // In cluster mode, do not rely on the disassociated event to exit
   // This avoids potentially reporting incorrect exit codes if the driver 
fails
-  if (!isClusterMode) {
+  if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
 
 Review comment:
   Update comment above?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2019-01-17 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r248862990
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -70,6 +73,11 @@ private[spark] class Client(
 
   private val isClusterMode = sparkConf.get("spark.submit.deployMode", 
"client") == "cluster"
 
+  private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && 
!isClusterMode
+  private var amServiceStarted = false
+  private var appMaster: ApplicationMaster = _
+  private var unManagedAMStagingDirPath: Path = _
 
 Review comment:
   Seems better to just store this in a variable for all cases. It's recomputed 
from the conf in a few different places in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-20 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r243433749
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -324,6 +308,59 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
   }
 
+  def runUnmanaged(clientRpcEnv: RpcEnv,
+  appAttemptId: ApplicationAttemptId,
+  stagingDir: Path): Unit = {
+try {
+  new CallerContext(
+"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
+Option(appAttemptId.getApplicationId.toString), 
None).setCurrentContext()
+
+  // This shutdown hook should run *after* the SparkContext is shut down.
 
 Review comment:
   This is client mode, so you can't rely on shutdown hooks. You need to 
explicitly stop this service when the SparkContext is shutdown.
   
   Imagine someone just embeds `sc = new SparkContext(); ...; sc.stop()` in 
their app code, but the app itself runs for way longer than the Spark app.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-20 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r243433843
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -324,6 +308,59 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
   }
 
+  def runUnmanaged(clientRpcEnv: RpcEnv,
 
 Review comment:
   Multi-line args start on the next line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-20 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r243435612
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -1098,14 +1106,41 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return createAppReport(report)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled 
&&
+  !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
 
 Review comment:
   `: Unit = `
   
   But given you should be explicitly stopping the AM, this should probably 
return the AM itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242714302
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -324,6 +311,23 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
   }
 
+  def runUnmanaged(clientRpcEnv: RpcEnv): Unit = {
+runImpl {
+  val driverRef = clientRpcEnv.setupEndpointRef(
+RpcAddress(sparkConf.get("spark.driver.host"),
+  sparkConf.get("spark.driver.port").toInt),
+YarnSchedulerBackend.ENDPOINT_NAME)
+  // The client-mode AM doesn't listen for incoming connections, so report 
an invalid port.
+  registerAM(
+Utils.localHostName, -1, sparkConf, 
sparkConf.getOption("spark.driver.appUIAddress"))
+  addAmIpFilter(Some(driverRef))
+  createAllocator(driverRef, sparkConf, clientRpcEnv)
+
+  // In client mode the actor will stop the reporter thread.
 
 Review comment:
   actor?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242715751
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -297,7 +303,10 @@ private[spark] class Client(
 "does not support it", e)
   }
 }
-
+if (isClientUnmanagedAMEnabled) {
+  // Set Unmanaged AM to true in Application Submission Context
+  appContext.setUnmanagedAM(true)
 
 Review comment:
   `appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)`
   
   Which also makes the comment unnecessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242717220
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -1098,14 +1109,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return createAppReport(report)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] 
=
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](
+token.getIdentifier().array(), token.getPassword().array,
+new Text(token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+sparkConf.set("spark.yarn.containerId",
+  ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 
1).toString)
 
 Review comment:
   Won't this name be the same as the first executor created by the app?
   
   I'd rather special-case `getContainerId` to return some baked-in string when 
the env variable is not set.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242714886
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -617,7 +625,14 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 try {
   val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
   if (!preserveFiles) {
-stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+var stagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
 
 Review comment:
   `val stagingDir = sys.props.get("...").getOrElse { ... }`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242715964
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -1098,14 +1109,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return createAppReport(report)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
 
 Review comment:
   indent one more level


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242716228
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -1098,14 +1109,39 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return createAppReport(report)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] 
=
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](
+token.getIdentifier().array(), token.getPassword().array,
+new Text(token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+sparkConf.set("spark.yarn.containerId",
+  ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 
1).toString)
+// Start Application Service in a separate thread and continue with 
application monitoring
+val amService = new Thread("Unmanaged Application Master Service") {
+  override def run(): Unit = new ApplicationMaster(new 
ApplicationMasterArguments(Array.empty),
 
 Review comment:
   This is a pretty long line. Break it down.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242715378
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -617,7 +625,14 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 try {
   val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
   if (!preserveFiles) {
-stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+var stagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
+if (stagingDir == null) {
+  val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) 
}
 
 Review comment:
   This looks similar to the logic in `Client.scala`. Maybe the value 
calculated there should be plumbed through, instead of adding this code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-18 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r242714148
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -234,20 +218,26 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 resources.toMap
   }
 
-  def getAttemptId(): ApplicationAttemptId = {
-client.getAttemptId()
+  def getAttemptId(sparkConf: SparkConf): ApplicationAttemptId = {
+client.getAttemptId(sparkConf)
   }
 
   final def run(): Int = {
 doAsUser {
-  runImpl()
+  runImpl(
+if (isClusterMode) {
+  runDriver()
+} else {
+  runExecutorLauncher()
+}
+  )
 }
 exitCode
   }
 
-  private def runImpl(): Unit = {
+  private def runImpl(opBlock: => Unit): Unit = {
 
 Review comment:
   There are things in this method that don't look right when you think about 
an unmanaged AM.
   
   e.g., overriding `spark.master`, `spark.ui.port`, etc, look wrong.
   
   The handling of app attempts also seems wrong, since with an unmanaged AM 
you don't have multiple attempts. Even the shutdown hooks seems a bit out of 
place.
   
   Seems to me it would be easier not to try to use this method for the 
unmanaged AM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-10 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r240421106
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -1084,14 +1095,38 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return createAppReport(report)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] 
=
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+.getIdentifier().array(), token.getPassword().array, new Text(
+token.getKind()), new Text(token.getService()))
+val currentUGI = UserGroupInformation.getCurrentUser
+currentUGI.addToken(amRMToken)
+
+sparkConf.set("spark.yarn.containerId",
+  ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 
1).toString)
+// Start Application Service in a separate thread and continue with 
application monitoring
+val amService = new Thread() {
 
 Review comment:
   Thread name?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-10 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r240419661
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -656,7 +664,9 @@ private[spark] class Client(
 // Clear the cache-related entries from the configuration to avoid them 
polluting the
 // UI's environment page. This works for client mode; for cluster mode, 
this is handled
 // by the AM.
-CACHE_CONFIGS.foreach(sparkConf.remove)
+if (!isClientUnmanagedAMEnabled) {
 
 Review comment:
   I think this is happening because you're starting the AM after these are 
removed from the conf. Should probably juggle things around or change how these 
are provided to the AM, since these configs are super noisy and shouldn't 
really show up in the UI.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-10 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r240421285
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##
 @@ -1084,14 +1095,38 @@ private[spark] class Client(
   if (returnOnRunning && state == YarnApplicationState.RUNNING) {
 return createAppReport(report)
   }
-
+  if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
+&& !amServiceStarted && report.getAMRMToken != null) {
+amServiceStarted = true
+startApplicationMasterService(report)
+  }
   lastState = state
 }
 
 // Never reached, but keeps compiler happy
 throw new SparkException("While loop is depleted! This should never 
happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+// Add AMRMToken to establish connection between RM and AM
+val token = report.getAMRMToken
+val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] 
=
+  new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
 
 Review comment:
   Keep related calls in the same like (e.g. `token.getIdentifier()`, `new 
Text(blah)`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-10 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r240419819
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ##
 @@ -183,8 +183,13 @@ object YarnSparkHadoopUtil {
 )
   }
 
-  def getContainerId: ContainerId = {
-val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+  def getContainerId(sparkConf: SparkConf): ContainerId = {
+  val containerIdString =
+  if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) 
!= null) {
 
 Review comment:
   better to use `sparkConf.getenv`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-10 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r240419719
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ##
 @@ -183,8 +183,13 @@ object YarnSparkHadoopUtil {
 )
   }
 
-  def getContainerId: ContainerId = {
-val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+  def getContainerId(sparkConf: SparkConf): ContainerId = {
+  val containerIdString =
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-10 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r240421526
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -51,32 +52,27 @@ import org.apache.spark.util._
 /**
  * Common application master functionality for Spark on Yarn.
  */
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments) 
extends Logging {
+private[spark] class ApplicationMaster(
+val args: ApplicationMasterArguments,
+val sparkConf: SparkConf,
+val yarnConf: YarnConfiguration)
+  extends Logging {
 
+  def this(sparkConf: SparkConf,
 
 Review comment:
   See above constructor for multi-line args style.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

2018-12-10 Thread GitBox
vanzin commented on a change in pull request #19616: [SPARK-22404][YARN] 
Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r240422378
 
 

 ##
 File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ##
 @@ -481,20 +478,29 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   }
 
   private def runExecutorLauncher(): Unit = {
-val hostname = Utils.localHostName
-val amCores = sparkConf.get(AM_CORES)
-rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, 
securityMgr,
-  amCores, true)
-
-// The client-mode AM doesn't listen for incoming connections, so report 
an invalid port.
-registerAM(hostname, -1, sparkConf, 
sparkConf.getOption("spark.driver.appUIAddress"))
-
-// The driver should be up and listening, so unlike cluster mode, just try 
to connect to it
-// with no waiting or retrying.
-val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
-val driverRef = rpcEnv.setupEndpointRef(
-  RpcAddress(driverHost, driverPort),
-  YarnSchedulerBackend.ENDPOINT_NAME)
+var driverRef : RpcEndpointRef = null
+if (sparkConf.get(YARN_UNMANAGED_AM)) {
 
 Review comment:
   I'm not a big fan of this change. Feels like you should have a different 
method here called `runUnmanaged` that is called instead of `run()`, and takes 
an `RpcEnv`.
   
   That way you don't need to keep `clientRpcEnv` at all since it would be 
local to that method, since nothing else here needs it. In fact even `rpcEnv` 
could go away and become a parameter to `createAllocator`...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org