Repository: spark Updated Branches: refs/heads/master 5fde66163 -> 8fdd48959
[SPARK-2165][YARN]add support for setting maxAppAttempts in the ApplicationSubmissionContext ...xt https://issues.apache.org/jira/browse/SPARK-2165 I still have 2 questions: * If this config is not set, we should use yarn's corresponding value or a default value(like 2) on spark side? * Is the config name best? Or "spark.yarn.am.maxAttempts"? Author: WangTaoTheTonic <barneystin...@aliyun.com> Closes #3878 from WangTaoTheTonic/SPARK-2165 and squashes the following commits: 1416c83 [WangTaoTheTonic] use the name spark.yarn.maxAppAttempts 202ac85 [WangTaoTheTonic] rephrase some afdfc99 [WangTaoTheTonic] more detailed description 91562c6 [WangTaoTheTonic] add support for setting maxAppAttempts in the ApplicationSubmissionContext Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fdd4895 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fdd4895 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fdd4895 Branch: refs/heads/master Commit: 8fdd48959c93b9cf809f03549e2ae6c4687d1fcd Parents: 5fde661 Author: WangTaoTheTonic <barneystin...@aliyun.com> Authored: Wed Jan 7 08:14:39 2015 -0600 Committer: Thomas Graves <tgra...@apache.org> Committed: Wed Jan 7 08:14:39 2015 -0600 ---------------------------------------------------------------------- docs/running-on-yarn.md | 8 ++++++++ .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 +++++ .../scala/org/apache/spark/deploy/yarn/YarnRMClient.scala | 7 +++++-- 4 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8fdd4895/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index da1c8e8..183698f 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes In cluster mode, use spark.driver.extraJavaOptions instead. </td> </tr> +<tr> + <td><code>spark.yarn.maxAppAttempts</code></td> + <td>yarn.resourcemanager.am.max-attempts in YARN</td> + <td> + The maximum number of attempts that will be made to submit the application. + It should be no larger than the global number of max attempts in the YARN configuration. + </td> +</tr> </table> # Launching Spark on YARN http://git-wip-us.apache.org/repos/asf/spark/blob/8fdd4895/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 618db7f..902bdda 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, logInfo("Invoking sc stop from shutdown hook") sc.stop() } - val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts if (!finished) { http://git-wip-us.apache.org/repos/asf/spark/blob/8fdd4895/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index addaddb..a2c3f91 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -98,6 +98,11 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { + case Some(v) => appContext.setMaxAppAttempts(v) + case None => logDebug("spark.yarn.maxAppAttempts is not set. " + + "Cluster's default value will be used.") + } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) appContext.setResource(capability) http://git-wip-us.apache.org/repos/asf/spark/blob/8fdd4895/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index bf4e159..e183efc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg } /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration): Int = - conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = { + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse( + yarnConf.getInt( + YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org