Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2350#discussion_r17633753
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---
    @@ -40,151 +39,102 @@ import org.apache.hadoop.yarn.util.Records
     import org.apache.spark.{Logging, SecurityManager, SparkConf, 
SparkContext, SparkException}
     
     /**
    - * The entry point (starting in Client#main() and Client#run()) for 
launching Spark on YARN. The
    - * Client submits an application to the YARN ResourceManager.
    + * The entry point (starting in Client#main() and Client#run()) for 
launching Spark on YARN.
    + * The Client submits an application to the YARN ResourceManager.
      */
    -trait ClientBase extends Logging {
    -  val args: ClientArguments
    -  val conf: Configuration
    -  val sparkConf: SparkConf
    -  val yarnConf: YarnConfiguration
    -  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    -  private val SPARK_STAGING: String = ".sparkStaging"
    +private[spark] trait ClientBase extends Logging {
    +  import ClientBase._
    +
    +  protected val args: ClientArguments
    +  protected val hadoopConf: Configuration
    +  protected val sparkConf: SparkConf
    +  protected val yarnConf: YarnConfiguration
    +  protected val credentials = 
UserGroupInformation.getCurrentUser.getCredentials
    +  protected val amMemoryOverhead = args.amMemoryOverhead // MB
    +  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
       private val distCacheMgr = new ClientDistributedCacheManager()
     
    -  // Staging directory is private! -> rwx--------
    -  val STAGING_DIR_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
    -  // App files are world-wide readable and owner writable -> rw-r--r--
    -  val APP_FILE_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
    -
    -  // Additional memory overhead - in mb.
    -  protected def memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.driver.memoryOverhead",
    -    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    -
    -  // TODO(harvey): This could just go in ClientArguments.
    -  def validateArgs() = {
    -    Map(
    -      (args.numExecutors <= 0) -> "Error: You must specify at least 1 
executor!",
    -      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must 
be" +
    -        "greater than: " + memoryOverhead),
    -      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory 
size" +
    -        "must be greater than: " + memoryOverhead.toString)
    -    ).foreach { case(cond, errStr) =>
    -      if (cond) {
    -        logError(errStr)
    -        throw new IllegalArgumentException(args.getUsageMessage())
    -      }
    -    }
    -  }
    -
    -  def getAppStagingDir(appId: ApplicationId): String = {
    -    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
    -  }
    -
    -  def verifyClusterResources(app: GetNewApplicationResponse) = {
    -    val maxMem = app.getMaximumResourceCapability().getMemory()
    -    logInfo("Max mem capabililty of a single resource in this cluster " + 
maxMem)
    -
    -    // If we have requested more then the clusters max for a single 
resource then exit.
    -    if (args.executorMemory > maxMem) {
    -      val errorMessage =
    -        "Required executor memory (%d MB), is above the max threshold (%d 
MB) of this cluster."
    -          .format(args.executorMemory, maxMem)
    -
    -      logError(errorMessage)
    -      throw new IllegalArgumentException(errorMessage)
    -    }
    -    val amMem = args.amMemory + memoryOverhead
    +  /**
    +   * Fail fast if we have requested more resources per container than is 
available in the cluster.
    +   */
    +  protected def verifyClusterResources(newAppResponse: 
GetNewApplicationResponse): Unit = {
    +    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    +    logInfo("Verifying our application has not requested more than the 
maximum " +
    +      s"memory capability of the cluster ($maxMem MB per container)")
    +    val executorMem = args.executorMemory + executorMemoryOverhead
    +    if (executorMem > maxMem) {
    +      throw new IllegalArgumentException(s"Required executor memory 
($executorMem MB) " +
    +        s"is above the max threshold ($maxMem MB) of this cluster!")
    +    }
    +    val amMem = getAMMemory(newAppResponse) + amMemoryOverhead
         if (amMem > maxMem) {
    -
    -      val errorMessage = "Required AM memory (%d) is above the max 
threshold (%d) of this cluster."
    -        .format(amMem, maxMem)
    -      logError(errorMessage)
    -      throw new IllegalArgumentException(errorMessage)
    +      throw new IllegalArgumentException(s"Required AM memory ($amMem MB) 
" +
    +        s"is above the max threshold ($maxMem MB) of this cluster!")
         }
    -
         // We could add checks to make sure the entire cluster has enough 
resources but that involves
         // getting all the node reports and computing ourselves.
       }
     
    -  /** See if two file systems are the same or not. */
    -  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
    -    val srcUri = srcFs.getUri()
    -    val dstUri = destFs.getUri()
    -    if (srcUri.getScheme() == null) {
    -      return false
    -    }
    -    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    -      return false
    -    }
    -    var srcHost = srcUri.getHost()
    -    var dstHost = dstUri.getHost()
    -    if ((srcHost != null) && (dstHost != null)) {
    -      try {
    -        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
    -        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
    -      } catch {
    -        case e: UnknownHostException =>
    -          return false
    -      }
    -      if (!srcHost.equals(dstHost)) {
    -        return false
    -      }
    -    } else if (srcHost == null && dstHost != null) {
    -      return false
    -    } else if (srcHost != null && dstHost == null) {
    -      return false
    -    }
    -    if (srcUri.getPort() != dstUri.getPort()) {
    -      false
    -    } else {
    -      true
    -    }
    -  }
    -
    -  /** Copy the file into HDFS if needed. */
    -  private[yarn] def copyRemoteFile(
    -      dstDir: Path,
    -      originalPath: Path,
    +  /**
    +   * Copy the given file to a remote file system if needed. This is used 
for preparing
    +   * resources for launching the ApplicationMaster container. Exposed for 
testing.
    +   */
    +  def copyFileToRemote(
    +      destDir: Path,
    +      srcPath: Path,
           replication: Short,
           setPerms: Boolean = false): Path = {
    -    val fs = FileSystem.get(conf)
    -    val remoteFs = originalPath.getFileSystem(conf)
    -    var newPath = originalPath
    -    if (!compareFs(remoteFs, fs)) {
    -      newPath = new Path(dstDir, originalPath.getName())
    -      logInfo("Uploading " + originalPath + " to " + newPath)
    -      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
    -      fs.setReplication(newPath, replication)
    -      if (setPerms) fs.setPermission(newPath, new 
FsPermission(APP_FILE_PERMISSION))
    +    val destFs = destDir.getFileSystem(hadoopConf)
    +    val srcFs = srcPath.getFileSystem(hadoopConf)
    +    var destPath = srcPath
    +    if (!compareFs(srcFs, destFs)) {
    +      destPath = new Path(destDir, srcPath.getName())
    +      logInfo(s"Uploading resource $srcPath -> $destPath")
    +      FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
    +      destFs.setReplication(destPath, replication)
    +      if (setPerms) {
    +        destFs.setPermission(destPath, new 
FsPermission(APP_FILE_PERMISSION))
    +      }
    +    } else {
    +      logInfo(s"Source and destination file systems are the same. Not 
copying $srcPath")
    --- End diff --
    
    I see, makes sense. (I actually added this `logInfo`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to