Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2350#discussion_r17861794
  
    --- Diff: 
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---
    @@ -37,154 +36,106 @@ import org.apache.hadoop.yarn.api.protocolrecords._
     import org.apache.hadoop.yarn.api.records._
     import org.apache.hadoop.yarn.conf.YarnConfiguration
     import org.apache.hadoop.yarn.util.Records
    +
     import org.apache.spark.{Logging, SecurityManager, SparkConf, 
SparkContext, SparkException}
     
     /**
    - * The entry point (starting in Client#main() and Client#run()) for 
launching Spark on YARN. The
    - * Client submits an application to the YARN ResourceManager.
    + * The entry point (starting in Client#main() and Client#run()) for 
launching Spark on YARN.
    + * The Client submits an application to the YARN ResourceManager.
      */
    -trait ClientBase extends Logging {
    -  val args: ClientArguments
    -  val conf: Configuration
    -  val sparkConf: SparkConf
    -  val yarnConf: YarnConfiguration
    -  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    -  private val SPARK_STAGING: String = ".sparkStaging"
    +private[spark] trait ClientBase extends Logging {
    +  import ClientBase._
    +
    +  protected val args: ClientArguments
    +  protected val hadoopConf: Configuration
    +  protected val sparkConf: SparkConf
    +  protected val yarnConf: YarnConfiguration
    +  protected val credentials = 
UserGroupInformation.getCurrentUser.getCredentials
    +  protected val amMemoryOverhead = args.amMemoryOverhead // MB
    +  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
       private val distCacheMgr = new ClientDistributedCacheManager()
     
    -  // Staging directory is private! -> rwx--------
    -  val STAGING_DIR_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
    -  // App files are world-wide readable and owner writable -> rw-r--r--
    -  val APP_FILE_PERMISSION: FsPermission =
    -    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
    -
    -  // Additional memory overhead - in mb.
    -  protected def memoryOverhead: Int = 
sparkConf.getInt("spark.yarn.driver.memoryOverhead",
    -    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
    -
    -  // TODO(harvey): This could just go in ClientArguments.
    -  def validateArgs() = {
    -    Map(
    -      (args.numExecutors <= 0) -> "Error: You must specify at least 1 
executor!",
    -      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must 
be" +
    -        "greater than: " + memoryOverhead),
    -      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory 
size" +
    -        "must be greater than: " + memoryOverhead.toString)
    -    ).foreach { case(cond, errStr) =>
    -      if (cond) {
    -        logError(errStr)
    -        throw new IllegalArgumentException(args.getUsageMessage())
    -      }
    -    }
    -  }
    -
    -  def getAppStagingDir(appId: ApplicationId): String = {
    -    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
    -  }
    -
    -  def verifyClusterResources(app: GetNewApplicationResponse) = {
    -    val maxMem = app.getMaximumResourceCapability().getMemory()
    -    logInfo("Max mem capabililty of a single resource in this cluster " + 
maxMem)
    -
    -    // If we have requested more then the clusters max for a single 
resource then exit.
    -    if (args.executorMemory > maxMem) {
    -      val errorMessage =
    -        "Required executor memory (%d MB), is above the max threshold (%d 
MB) of this cluster."
    -          .format(args.executorMemory, maxMem)
    -
    -      logError(errorMessage)
    -      throw new IllegalArgumentException(errorMessage)
    -    }
    -    val amMem = args.amMemory + memoryOverhead
    +  /**
    +   * Fail fast if we have requested more resources per container than is 
available in the cluster.
    +   */
    +  protected def verifyClusterResources(newAppResponse: 
GetNewApplicationResponse): Unit = {
    +    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    +    logInfo("Verifying our application has not requested more than the 
maximum " +
    +      s"memory capability of the cluster ($maxMem MB per container)")
    +    val executorMem = args.executorMemory + executorMemoryOverhead
    +    if (executorMem > maxMem) {
    +      throw new IllegalArgumentException(s"Required executor memory 
($executorMem MB) " +
    +        s"is above the max threshold ($maxMem MB) of this cluster!")
    +    }
    +    val amMem = args.amMemory + amMemoryOverhead
         if (amMem > maxMem) {
    -
    -      val errorMessage = "Required AM memory (%d) is above the max 
threshold (%d) of this cluster."
    -        .format(amMem, maxMem)
    -      logError(errorMessage)
    -      throw new IllegalArgumentException(errorMessage)
    +      throw new IllegalArgumentException(s"Required AM memory ($amMem MB) 
" +
    +        s"is above the max threshold ($maxMem MB) of this cluster!")
         }
    -
         // We could add checks to make sure the entire cluster has enough 
resources but that involves
         // getting all the node reports and computing ourselves.
       }
     
    -  /** See if two file systems are the same or not. */
    -  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
    -    val srcUri = srcFs.getUri()
    -    val dstUri = destFs.getUri()
    -    if (srcUri.getScheme() == null) {
    -      return false
    -    }
    -    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
    -      return false
    -    }
    -    var srcHost = srcUri.getHost()
    -    var dstHost = dstUri.getHost()
    -    if ((srcHost != null) && (dstHost != null)) {
    -      try {
    -        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
    -        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
    -      } catch {
    -        case e: UnknownHostException =>
    -          return false
    -      }
    -      if (!srcHost.equals(dstHost)) {
    -        return false
    -      }
    -    } else if (srcHost == null && dstHost != null) {
    -      return false
    -    } else if (srcHost != null && dstHost == null) {
    -      return false
    -    }
    -    if (srcUri.getPort() != dstUri.getPort()) {
    -      false
    -    } else {
    -      true
    -    }
    -  }
    -
    -  /** Copy the file into HDFS if needed. */
    -  private[yarn] def copyRemoteFile(
    -      dstDir: Path,
    -      originalPath: Path,
    +  /**
    +   * Copy the given file to a remote file system if needed. This is used 
for preparing
    +   * resources for launching the ApplicationMaster container. Exposed for 
testing.
    +   */
    +  def copyFileToRemote(
    --- End diff --
    
    Its supposed to support copying it from other locations other then just 
file://, like another HDFS location.  The local variable is named badly 
(localPath).  Its actually reading what is specified by the user for the user 
app jar, spark jar, etc.
    
    I guess it depends on the definition of remote here.  In general when 
running on yarn I wouldn't consider the hdfs installation on that yarn cluster 
as remote.   But its all in the perception.  I'm fine with the name as long as 
we are clear in description of what it does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to