Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2350#discussion_r17646215
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---
    @@ -40,151 +39,102 @@ import org.apache.hadoop.yarn.util.Records
     import org.apache.spark.{Logging, SecurityManager, SparkConf, 
SparkContext, SparkException}
     
     /**
    - * The entry point (starting in Client#main() and Client#run()) for 
launching Spark on YARN. The
    - * Client submits an application to the YARN ResourceManager.
    + * The entry point (starting in Client#main() and Client#run()) for 
launching Spark on YARN.
    + * The Client submits an application to the YARN ResourceManager.
      */
    -trait ClientBase extends Logging {
    -  val args: ClientArguments
    -  val conf: Configuration
    -  val sparkConf: SparkConf
    -  val yarnConf: YarnConfiguration
    -  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    -  private val SPARK_STAGING: String = ".sparkStaging"
    +private[spark] trait ClientBase extends Logging {
    +  import ClientBase._
    +
    +  protected val args: ClientArguments
    +  protected val hadoopConf: Configuration
    +  protected val sparkConf: SparkConf
    +  protected val yarnConf: YarnConfiguration
    +  protected val credentials = 
UserGroupInformation.getCurrentUser.getCredentials
    +  protected val amMemoryOverhead = args.amMemoryOverhead // MB
    +  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
       private val distCacheMgr = new ClientDistributedCacheManager()
     
    -  // Staging directory is private! -> rwx--------
    -  val STAGING_DIR_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
    -  // App files are world-wide readable and owner writable -> rw-r--r--
    -  val APP_FILE_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
    -
    -  // Additional memory overhead - in mb.
    -  protected def memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.driver.memoryOverhead",
    -    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    -
    -  // TODO(harvey): This could just go in ClientArguments.
    -  def validateArgs() = {
    -    Map(
    -      (args.numExecutors <= 0) -> "Error: You must specify at least 1 
executor!",
    -      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must 
be" +
    -        "greater than: " + memoryOverhead),
    -      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory 
size" +
    -        "must be greater than: " + memoryOverhead.toString)
    -    ).foreach { case(cond, errStr) =>
    -      if (cond) {
    -        logError(errStr)
    -        throw new IllegalArgumentException(args.getUsageMessage())
    -      }
    -    }
    -  }
    -
    -  def getAppStagingDir(appId: ApplicationId): String = {
    -    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
    -  }
    -
    -  def verifyClusterResources(app: GetNewApplicationResponse) = {
    -    val maxMem = app.getMaximumResourceCapability().getMemory()
    -    logInfo("Max mem capabililty of a single resource in this cluster " + 
maxMem)
    -
    -    // If we have requested more then the clusters max for a single 
resource then exit.
    -    if (args.executorMemory > maxMem) {
    -      val errorMessage =
    -        "Required executor memory (%d MB), is above the max threshold (%d 
MB) of this cluster."
    -          .format(args.executorMemory, maxMem)
    -
    -      logError(errorMessage)
    -      throw new IllegalArgumentException(errorMessage)
    -    }
    -    val amMem = args.amMemory + memoryOverhead
    +  /**
    +   * Fail fast if we have requested more resources per container than is 
available in the cluster.
    +   */
    +  protected def verifyClusterResources(newAppResponse: 
GetNewApplicationResponse): Unit = {
    +    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    +    logInfo("Verifying our application has not requested more than the 
maximum " +
    +      s"memory capability of the cluster ($maxMem MB per container)")
    +    val executorMem = args.executorMemory + executorMemoryOverhead
    +    if (executorMem > maxMem) {
    +      throw new IllegalArgumentException(s"Required executor memory 
($executorMem MB) " +
    --- End diff --
    
    Yes, I verified this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to