MAPREDUCE-6690. Limit the number of resources a single map reduce job can 
submit for localization. Contributed by Chris Trezzo


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f80a7298
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f80a7298
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f80a7298

Branch: refs/heads/YARN-2915
Commit: f80a7298325a4626638ee24467e2012442e480d4
Parents: 7f05ff7
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Aug 17 16:21:20 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Aug 17 16:22:31 2016 +0000

----------------------------------------------------------------------
 .../hadoop/mapreduce/JobResourceUploader.java   | 214 +++++++++--
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  28 ++
 .../ClientDistributedCacheManager.java          |  15 +-
 .../src/main/resources/mapred-default.xml       |  30 ++
 .../mapreduce/TestJobResourceUploader.java      | 355 +++++++++++++++++++
 .../apache/hadoop/mapreduce/v2/TestMRJobs.java  | 166 ++++++++-
 6 files changed, 776 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f80a7298/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
index fa4dd86..15dbc13 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -21,12 +21,16 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -34,6 +38,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 class JobResourceUploader {
@@ -86,31 +92,37 @@ class JobResourceUploader {
     FsPermission mapredSysPerms =
         new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
     FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
-    // add all the command line files/ jars and archive
-    // first copy them to jobtrackers filesystem
 
-    uploadFiles(conf, submitJobDir, mapredSysPerms, replication);
-    uploadLibJars(conf, submitJobDir, mapredSysPerms, replication);
-    uploadArchives(conf, submitJobDir, mapredSysPerms, replication);
-    uploadJobJar(job, submitJobDir, replication);
+    Collection<String> files = conf.getStringCollection("tmpfiles");
+    Collection<String> libjars = conf.getStringCollection("tmpjars");
+    Collection<String> archives = conf.getStringCollection("tmparchives");
+    String jobJar = job.getJar();
+
+    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+    checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache);
+
+    uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication);
+    uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication);
+    uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication);
+    uploadJobJar(job, jobJar, submitJobDir, replication);
     addLog4jToDistributedCache(job, submitJobDir);
 
     // set the timestamps of the archives and files
     // set the public/private visibility of the archives and files
-    
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
+    ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf,
+        statCache);
     // get DelegationToken for cached file
     ClientDistributedCacheManager.getDelegationTokens(conf,
         job.getCredentials());
   }
 
-  private void uploadFiles(Configuration conf, Path submitJobDir,
-      FsPermission mapredSysPerms, short submitReplication) throws IOException 
{
-    String files = conf.get("tmpfiles");
+  private void uploadFiles(Configuration conf, Collection<String> files,
+      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
+      throws IOException {
     Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
-    if (files != null) {
+    if (!files.isEmpty()) {
       FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
-      String[] fileArr = files.split(",");
-      for (String tmpFile : fileArr) {
+      for (String tmpFile : files) {
         URI tmpURI = null;
         try {
           tmpURI = new URI(tmpFile);
@@ -130,14 +142,13 @@ class JobResourceUploader {
     }
   }
 
-  private void uploadLibJars(Configuration conf, Path submitJobDir,
-      FsPermission mapredSysPerms, short submitReplication) throws IOException 
{
-    String libjars = conf.get("tmpjars");
+  private void uploadLibJars(Configuration conf, Collection<String> libjars,
+      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
+      throws IOException {
     Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
-    if (libjars != null) {
+    if (!libjars.isEmpty()) {
       FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
-      String[] libjarsArr = libjars.split(",");
-      for (String tmpjars : libjarsArr) {
+      for (String tmpjars : libjars) {
         Path tmp = new Path(tmpjars);
         Path newPath =
             copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
@@ -157,14 +168,13 @@ class JobResourceUploader {
     }
   }
 
-  private void uploadArchives(Configuration conf, Path submitJobDir,
-      FsPermission mapredSysPerms, short submitReplication) throws IOException 
{
-    String archives = conf.get("tmparchives");
+  private void uploadArchives(Configuration conf, Collection<String> archives,
+      Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
+      throws IOException {
     Path archivesDir = 
JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
-    if (archives != null) {
+    if (!archives.isEmpty()) {
       FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
-      String[] archivesArr = archives.split(",");
-      for (String tmpArchives : archivesArr) {
+      for (String tmpArchives : archives) {
         URI tmpURI;
         try {
           tmpURI = new URI(tmpArchives);
@@ -185,9 +195,8 @@ class JobResourceUploader {
     }
   }
 
-  private void uploadJobJar(Job job, Path submitJobDir, short 
submitReplication)
-      throws IOException {
-    String jobJar = job.getJar();
+  private void uploadJobJar(Job job, String jobJar, Path submitJobDir,
+      short submitReplication) throws IOException {
     if (jobJar != null) { // copy jar to JobTracker's fs
       // use jar name if job is not named.
       if ("".equals(job.getJobName())) {
@@ -208,6 +217,155 @@ class JobResourceUploader {
     }
   }
 
+  /**
+   * Verify that the resources this job is going to localize are within the
+   * localization limits.
+   */
+  @VisibleForTesting
+  void checkLocalizationLimits(Configuration conf, Collection<String> files,
+      Collection<String> libjars, Collection<String> archives, String jobJar,
+      Map<URI, FileStatus> statCache) throws IOException {
+
+    LimitChecker limitChecker = new LimitChecker(conf);
+    if (!limitChecker.hasLimits()) {
+      // there are no limits set, so we are done.
+      return;
+    }
+
+    // Get the files and archives that are already in the distributed cache
+    Collection<String> dcFiles =
+        conf.getStringCollection(MRJobConfig.CACHE_FILES);
+    Collection<String> dcArchives =
+        conf.getStringCollection(MRJobConfig.CACHE_ARCHIVES);
+
+    for (String path : dcFiles) {
+      explorePath(conf, new Path(path), limitChecker, statCache);
+    }
+
+    for (String path : dcArchives) {
+      explorePath(conf, new Path(path), limitChecker, statCache);
+    }
+
+    for (String path : files) {
+      explorePath(conf, new Path(path), limitChecker, statCache);
+    }
+
+    for (String path : libjars) {
+      explorePath(conf, new Path(path), limitChecker, statCache);
+    }
+
+    for (String path : archives) {
+      explorePath(conf, new Path(path), limitChecker, statCache);
+    }
+
+    if (jobJar != null) {
+      explorePath(conf, new Path(jobJar), limitChecker, statCache);
+    }
+  }
+
+  @VisibleForTesting
+  protected static final String MAX_RESOURCE_ERR_MSG =
+      "This job has exceeded the maximum number of submitted resources";
+  @VisibleForTesting
+  protected static final String MAX_TOTAL_RESOURCE_MB_ERR_MSG =
+      "This job has exceeded the maximum size of submitted resources";
+  @VisibleForTesting
+  protected static final String MAX_SINGLE_RESOURCE_MB_ERR_MSG =
+      "This job has exceeded the maximum size of a single submitted resource";
+
+  private static class LimitChecker {
+    LimitChecker(Configuration conf) {
+      this.maxNumOfResources =
+          conf.getInt(MRJobConfig.MAX_RESOURCES,
+              MRJobConfig.MAX_RESOURCES_DEFAULT);
+      this.maxSizeMB =
+          conf.getLong(MRJobConfig.MAX_RESOURCES_MB,
+              MRJobConfig.MAX_RESOURCES_MB_DEFAULT);
+      this.maxSizeOfResourceMB =
+          conf.getLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
+              MRJobConfig.MAX_SINGLE_RESOURCE_MB_DEFAULT);
+      this.totalConfigSizeBytes = maxSizeMB * 1024 * 1024;
+      this.totalConfigSizeOfResourceBytes = maxSizeOfResourceMB * 1024 * 1024;
+    }
+
+    private long totalSizeBytes = 0;
+    private int totalNumberOfResources = 0;
+    private long currentMaxSizeOfFileBytes = 0;
+    private final long maxSizeMB;
+    private final int maxNumOfResources;
+    private final long maxSizeOfResourceMB;
+    private final long totalConfigSizeBytes;
+    private final long totalConfigSizeOfResourceBytes;
+
+    private boolean hasLimits() {
+      return maxNumOfResources > 0 || maxSizeMB > 0 || maxSizeOfResourceMB > 0;
+    }
+
+    private void addFile(Path p, long fileSizeBytes) throws IOException {
+      totalNumberOfResources++;
+      totalSizeBytes += fileSizeBytes;
+      if (fileSizeBytes > currentMaxSizeOfFileBytes) {
+        currentMaxSizeOfFileBytes = fileSizeBytes;
+      }
+
+      if (totalConfigSizeBytes > 0 && totalSizeBytes > totalConfigSizeBytes) {
+        throw new IOException(MAX_TOTAL_RESOURCE_MB_ERR_MSG + " (Max: "
+            + maxSizeMB + "MB).");
+      }
+
+      if (maxNumOfResources > 0 &&
+          totalNumberOfResources > maxNumOfResources) {
+        throw new IOException(MAX_RESOURCE_ERR_MSG + " (Max: "
+            + maxNumOfResources + ").");
+      }
+
+      if (totalConfigSizeOfResourceBytes > 0
+          && currentMaxSizeOfFileBytes > totalConfigSizeOfResourceBytes) {
+        throw new IOException(MAX_SINGLE_RESOURCE_MB_ERR_MSG + " (Max: "
+            + maxSizeOfResourceMB + "MB, Violating resource: " + p + ").");
+      }
+    }
+  }
+
+  /**
+   * Recursively explore the given path and enforce the limits for resource
+   * localization. This method assumes that there are no symlinks in the
+   * directory structure.
+   */
+  private void explorePath(Configuration job, Path p,
+      LimitChecker limitChecker, Map<URI, FileStatus> statCache)
+      throws IOException {
+    Path pathWithScheme = p;
+    if (!pathWithScheme.toUri().isAbsolute()) {
+      // the path does not have a scheme, so we assume it is a path from the
+      // local filesystem
+      FileSystem localFs = FileSystem.getLocal(job);
+      pathWithScheme = localFs.makeQualified(p);
+    }
+    FileStatus status = getFileStatus(statCache, job, pathWithScheme);
+    if (status.isDirectory()) {
+      FileStatus[] statusArray =
+          pathWithScheme.getFileSystem(job).listStatus(pathWithScheme);
+      for (FileStatus s : statusArray) {
+        explorePath(job, s.getPath(), limitChecker, statCache);
+      }
+    } else {
+      limitChecker.addFile(pathWithScheme, status.getLen());
+    }
+  }
+
+  @VisibleForTesting
+  FileStatus getFileStatus(Map<URI, FileStatus> statCache,
+      Configuration job, Path p) throws IOException {
+    URI u = p.toUri();
+    FileStatus status = statCache.get(u);
+    if (status == null) {
+      status = p.getFileSystem(job).getFileStatus(p);
+      statCache.put(u, status);
+    }
+    return status;
+  }
+
   // copies a file to the jobtracker filesystem and returns the path where it
   // was copied to
   private Path copyRemoteFiles(Path parentDir, Path originalPath,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f80a7298/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 53e4b38..d740747 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -967,6 +967,34 @@ public interface MRJobConfig {
           128;
 
   /**
+   * The maximum number of resources a map reduce job is allowed to submit for
+   * localization via files, libjars, archives, and jobjar command line
+   * arguments and through the distributed cache. If set to 0 the limit is
+   * ignored.
+   */
+  String MAX_RESOURCES = "mapreduce.job.cache.limit.max-resources";
+  int MAX_RESOURCES_DEFAULT = 0;
+
+  /**
+   * The maximum size (in MB) a map reduce job is allowed to submit for
+   * localization via files, libjars, archives, and jobjar command line
+   * arguments and through the distributed cache. If set to 0 the limit is
+   * ignored.
+   */
+  String MAX_RESOURCES_MB = "mapreduce.job.cache.limit.max-resources-mb";
+  long MAX_RESOURCES_MB_DEFAULT = 0;
+
+  /**
+   * The maximum size (in MB) of a single resource a map reduce job is allow to
+   * submit for localization via files, libjars, archives, and jobjar command
+   * line arguments and through the distributed cache. If set to 0 the limit is
+   * ignored.
+   */
+  String MAX_SINGLE_RESOURCE_MB =
+      "mapreduce.job.cache.limit.max-single-resource-mb";
+  long MAX_SINGLE_RESOURCE_MB_DEFAULT = 0;
+
+  /**
    * Number of OPPORTUNISTIC Containers per 100 containers that will be
    * requested by the MRAppMaster. The Default value is 0, which implies all
    * maps will be guaranteed. A value of 100 means all maps will be requested

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f80a7298/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
index 19470e8..73a0330 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
@@ -54,10 +54,23 @@ public class ClientDistributedCacheManager {
   public static void determineTimestampsAndCacheVisibilities(Configuration job)
   throws IOException {
     Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+    determineTimestampsAndCacheVisibilities(job, statCache);
+  }
+
+  /**
+   * See ClientDistributedCacheManager#determineTimestampsAndCacheVisibilities(
+   * Configuration).
+   *
+   * @param job Configuration of a job
+   * @param statCache A map containing cached file status objects
+   * @throws IOException if there is a problem with the underlying filesystem
+   */
+  public static void determineTimestampsAndCacheVisibilities(Configuration job,
+      Map<URI, FileStatus> statCache) throws IOException {
     determineTimestamps(job, statCache);
     determineCacheVisibilities(job, statCache);
   }
-  
+
   /**
    * Determines timestamps of files to be cached, and stores those
    * in the configuration.  This is intended to be used internally by JobClient

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f80a7298/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 8ba0ec9..33eece3 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1916,6 +1916,36 @@
 </property>
 
 <property>
+  <name>mapreduce.job.cache.limit.max-resources</name>
+  <value>0</value>
+  <description>The maximum number of resources a map reduce job is allowed to
+    submit for localization via files, libjars, archives, and jobjar command
+    line arguments and through the distributed cache. If set to 0 the limit is
+    ignored.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.cache.limit.max-resources-mb</name>
+  <value>0</value>
+  <description>The maximum size (in MB) a map reduce job is allowed to submit
+    for localization via files, libjars, archives, and jobjar command line
+    arguments and through the distributed cache. If set to 0 the limit is
+    ignored.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.cache.limit.max-single-resource-mb</name>
+  <value>0</value>
+  <description>The maximum size (in MB) of a single resource a map reduce job
+    is allow to submit for localization via files, libjars, archives, and
+    jobjar command line arguments and through the distributed cache. If set to
+    0 the limit is ignored.
+  </description>
+</property>
+
+<property>
   <description>
     Value of the xframe-options
   </description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f80a7298/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
new file mode 100644
index 0000000..36ea57a
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * A class for unit testing JobResourceUploader.
+ */
+public class TestJobResourceUploader {
+
+  @Test
+  public void testAllDefaults() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    runLimitsTest(b.build(), true, null);
+  }
+
+  @Test
+  public void testNoLimitsWithResources() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    b.setNumOfDCArchives(1);
+    b.setNumOfDCFiles(1);
+    b.setNumOfTmpArchives(10);
+    b.setNumOfTmpFiles(1);
+    b.setNumOfTmpLibJars(1);
+    b.setJobJar(true);
+    b.setSizeOfResource(10);
+    runLimitsTest(b.build(), true, null);
+  }
+
+  @Test
+  public void testAtResourceLimit() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    b.setNumOfDCArchives(1);
+    b.setNumOfDCFiles(1);
+    b.setNumOfTmpArchives(1);
+    b.setNumOfTmpFiles(1);
+    b.setNumOfTmpLibJars(1);
+    b.setJobJar(true);
+    b.setMaxResources(6);
+    runLimitsTest(b.build(), true, null);
+  }
+
+  @Test
+  public void testOverResourceLimit() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    b.setNumOfDCArchives(1);
+    b.setNumOfDCFiles(1);
+    b.setNumOfTmpArchives(1);
+    b.setNumOfTmpFiles(2);
+    b.setNumOfTmpLibJars(1);
+    b.setJobJar(true);
+    b.setMaxResources(6);
+    runLimitsTest(b.build(), false, ResourceViolation.NUMBER_OF_RESOURCES);
+  }
+
+  @Test
+  public void testAtResourcesMBLimit() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    b.setNumOfDCArchives(1);
+    b.setNumOfDCFiles(1);
+    b.setNumOfTmpArchives(1);
+    b.setNumOfTmpFiles(2);
+    b.setNumOfTmpLibJars(1);
+    b.setJobJar(true);
+    b.setMaxResourcesMB(7);
+    b.setSizeOfResource(1);
+    runLimitsTest(b.build(), true, null);
+  }
+
+  @Test
+  public void testOverResourcesMBLimit() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    b.setNumOfDCArchives(1);
+    b.setNumOfDCFiles(2);
+    b.setNumOfTmpArchives(1);
+    b.setNumOfTmpFiles(2);
+    b.setNumOfTmpLibJars(1);
+    b.setJobJar(true);
+    b.setMaxResourcesMB(7);
+    b.setSizeOfResource(1);
+    runLimitsTest(b.build(), false, ResourceViolation.TOTAL_RESOURCE_SIZE);
+  }
+
+  @Test
+  public void testAtSingleResourceMBLimit() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    b.setNumOfDCArchives(1);
+    b.setNumOfDCFiles(2);
+    b.setNumOfTmpArchives(1);
+    b.setNumOfTmpFiles(2);
+    b.setNumOfTmpLibJars(1);
+    b.setJobJar(true);
+    b.setMaxSingleResourceMB(1);
+    b.setSizeOfResource(1);
+    runLimitsTest(b.build(), true, null);
+  }
+
+  @Test
+  public void testOverSingleResourceMBLimit() throws IOException {
+    ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder();
+    b.setNumOfDCArchives(1);
+    b.setNumOfDCFiles(2);
+    b.setNumOfTmpArchives(1);
+    b.setNumOfTmpFiles(2);
+    b.setNumOfTmpLibJars(1);
+    b.setJobJar(true);
+    b.setMaxSingleResourceMB(1);
+    b.setSizeOfResource(10);
+    runLimitsTest(b.build(), false, ResourceViolation.SINGLE_RESOURCE_SIZE);
+  }
+
+  private enum ResourceViolation {
+    NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE;
+  }
+
+  private void runLimitsTest(ResourceLimitsConf rlConf,
+      boolean checkShouldSucceed, ResourceViolation violation)
+      throws IOException {
+
+    if (!checkShouldSucceed && violation == null) {
+      Assert.fail("Test is misconfigured. checkShouldSucceed is set to false"
+          + " and a ResourceViolation is not specified.");
+    }
+
+    JobConf conf = setupJobConf(rlConf);
+    JobResourceUploader uploader = new StubedUploader(conf);
+    long configuredSizeOfResourceBytes = rlConf.sizeOfResource * 1024 * 1024;
+    when(mockedStatus.getLen()).thenReturn(configuredSizeOfResourceBytes);
+    when(mockedStatus.isDirectory()).thenReturn(false);
+    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
+    try {
+      uploader.checkLocalizationLimits(conf,
+          conf.getStringCollection("tmpfiles"),
+          conf.getStringCollection("tmpjars"),
+          conf.getStringCollection("tmparchives"),
+          conf.getJar(), statCache);
+      Assert.assertTrue("Limits check succeeded when it should have failed.",
+          checkShouldSucceed);
+    } catch (IOException e) {
+      if (checkShouldSucceed) {
+        Assert.fail("Limits check failed when it should have succeeded: " + e);
+      }
+      switch (violation) {
+      case NUMBER_OF_RESOURCES:
+        if (!e.getMessage().contains(
+            JobResourceUploader.MAX_RESOURCE_ERR_MSG)) {
+          Assert.fail("Test failed unexpectedly: " + e);
+        }
+        break;
+
+      case TOTAL_RESOURCE_SIZE:
+        if (!e.getMessage().contains(
+            JobResourceUploader.MAX_TOTAL_RESOURCE_MB_ERR_MSG)) {
+          Assert.fail("Test failed unexpectedly: " + e);
+        }
+        break;
+
+      case SINGLE_RESOURCE_SIZE:
+        if (!e.getMessage().contains(
+            JobResourceUploader.MAX_SINGLE_RESOURCE_MB_ERR_MSG)) {
+          Assert.fail("Test failed unexpectedly: " + e);
+        }
+        break;
+
+      default:
+        Assert.fail("Test failed unexpectedly: " + e);
+        break;
+      }
+    }
+  }
+
+  private final FileStatus mockedStatus = mock(FileStatus.class);
+
+  private JobConf setupJobConf(ResourceLimitsConf rlConf) {
+    JobConf conf = new JobConf();
+    conf.setInt(MRJobConfig.MAX_RESOURCES, rlConf.maxResources);
+    conf.setLong(MRJobConfig.MAX_RESOURCES_MB, rlConf.maxResourcesMB);
+    conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
+        rlConf.maxSingleResourceMB);
+
+    conf.set("tmpfiles",
+        buildPathString("file://tmpFiles", rlConf.numOfTmpFiles));
+    conf.set("tmpjars",
+        buildPathString("file://tmpjars", rlConf.numOfTmpLibJars));
+    conf.set("tmparchives",
+        buildPathString("file://tmpArchives", rlConf.numOfTmpArchives));
+    conf.set(MRJobConfig.CACHE_ARCHIVES,
+        buildPathString("file://cacheArchives", rlConf.numOfDCArchives));
+    conf.set(MRJobConfig.CACHE_FILES,
+        buildPathString("file://cacheFiles", rlConf.numOfDCFiles));
+    if (rlConf.jobJar) {
+      conf.setJar("file://jobjar.jar");
+    }
+    return conf;
+  }
+
+  private String buildPathString(String pathPrefix, int numOfPaths) {
+    if (numOfPaths < 1) {
+      return "";
+    } else {
+      StringBuilder b = new StringBuilder();
+      b.append(pathPrefix + 0);
+      for (int i = 1; i < numOfPaths; i++) {
+        b.append("," + pathPrefix + i);
+      }
+      return b.toString();
+    }
+  }
+
+  final static class ResourceLimitsConf {
+    private final int maxResources;
+    private final long maxResourcesMB;
+    private final long maxSingleResourceMB;
+    private final int numOfTmpFiles;
+    private final int numOfTmpArchives;
+    private final int numOfTmpLibJars;
+    private final boolean jobJar;
+    private final int numOfDCFiles;
+    private final int numOfDCArchives;
+    private final long sizeOfResource;
+
+    static final ResourceLimitsConf DEFAULT = new ResourceLimitsConf();
+
+    private ResourceLimitsConf() {
+      this(new Builder());
+    }
+
+    private ResourceLimitsConf(Builder builder) {
+      this.maxResources = builder.maxResources;
+      this.maxResourcesMB = builder.maxResourcesMB;
+      this.maxSingleResourceMB = builder.maxSingleResourceMB;
+      this.numOfTmpFiles = builder.numOfTmpFiles;
+      this.numOfTmpArchives = builder.numOfTmpArchives;
+      this.numOfTmpLibJars = builder.numOfTmpLibJars;
+      this.jobJar = builder.jobJar;
+      this.numOfDCFiles = builder.numOfDCFiles;
+      this.numOfDCArchives = builder.numOfDCArchives;
+      this.sizeOfResource = builder.sizeOfResource;
+    }
+
+    static class Builder {
+      // Defaults
+      private int maxResources = 0;
+      private long maxResourcesMB = 0;
+      private long maxSingleResourceMB = 0;
+      private int numOfTmpFiles = 0;
+      private int numOfTmpArchives = 0;
+      private int numOfTmpLibJars = 0;
+      private boolean jobJar = false;
+      private int numOfDCFiles = 0;
+      private int numOfDCArchives = 0;
+      private long sizeOfResource = 0;
+
+      Builder() {
+      }
+
+      Builder setMaxResources(int max) {
+        this.maxResources = max;
+        return this;
+      }
+
+      Builder setMaxResourcesMB(long max) {
+        this.maxResourcesMB = max;
+        return this;
+      }
+
+      Builder setMaxSingleResourceMB(long max) {
+        this.maxSingleResourceMB = max;
+        return this;
+      }
+
+      Builder setNumOfTmpFiles(int num) {
+        this.numOfTmpFiles = num;
+        return this;
+      }
+
+      Builder setNumOfTmpArchives(int num) {
+        this.numOfTmpArchives = num;
+        return this;
+      }
+
+      Builder setNumOfTmpLibJars(int num) {
+        this.numOfTmpLibJars = num;
+        return this;
+      }
+
+      Builder setJobJar(boolean jar) {
+        this.jobJar = jar;
+        return this;
+      }
+
+      Builder setNumOfDCFiles(int num) {
+        this.numOfDCFiles = num;
+        return this;
+      }
+
+      Builder setNumOfDCArchives(int num) {
+        this.numOfDCArchives = num;
+        return this;
+      }
+
+      Builder setSizeOfResource(long sizeMB) {
+        this.sizeOfResource = sizeMB;
+        return this;
+      }
+
+      ResourceLimitsConf build() {
+        return new ResourceLimitsConf(this);
+      }
+    }
+  }
+
+  class StubedUploader extends JobResourceUploader {
+    StubedUploader(JobConf conf) throws IOException {
+      super(FileSystem.getLocal(conf), false);
+    }
+
+    @Override
+    FileStatus getFileStatus(Map<URI, FileStatus> statCache, Configuration job,
+        Path p) throws IOException {
+      return mockedStatus;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f80a7298/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
index 451ec57..32b3a42 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
@@ -138,6 +138,8 @@ public class TestMRJobs {
   static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
   private static final String OUTPUT_ROOT_DIR = "/tmp/" +
     TestMRJobs.class.getSimpleName();
+  private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR,
+      "localizedResources");
 
   @BeforeClass
   public static void setup() throws IOException {
@@ -173,7 +175,7 @@ public class TestMRJobs {
   }
 
   @AfterClass
-  public static void tearDown() {
+  public static void tearDown() throws IOException {
     if (mrCluster != null) {
       mrCluster.stop();
       mrCluster = null;
@@ -182,6 +184,10 @@ public class TestMRJobs {
       dfsCluster.shutdown();
       dfsCluster = null;
     }
+    if (localFs.exists(TEST_RESOURCES_DIR)) {
+      // clean up resource directory
+      localFs.delete(TEST_RESOURCES_DIR, true);
+    }
   }
 
   @After
@@ -189,6 +195,39 @@ public class TestMRJobs {
     numSleepReducers = DEFAULT_REDUCES;
   }
 
+  private static void setupJobResourceDirs() throws IOException {
+    if (localFs.exists(TEST_RESOURCES_DIR)) {
+      // clean up directory
+      localFs.delete(TEST_RESOURCES_DIR, true);
+    }
+
+    localFs.mkdirs(TEST_RESOURCES_DIR);
+    FSDataOutputStream outF1 = null;
+    try {
+      // 10KB file
+      outF1 = localFs.create(new Path(TEST_RESOURCES_DIR, "file1.txt"));
+      outF1.write(new byte[10 * 1024]);
+    } finally {
+      if (outF1 != null) {
+        outF1.close();
+      }
+    }
+    localFs.createNewFile(new Path(TEST_RESOURCES_DIR, "file2.txt"));
+    Path subDir = new Path(TEST_RESOURCES_DIR, "subDir");
+    localFs.mkdirs(subDir);
+    FSDataOutputStream outF3 = null;
+    try {
+      // 1MB (plus 10 Bytes) file
+      outF3 = localFs.create(new Path(subDir, "file3.txt"));
+      outF3.write(new byte[(1 * 1024 * 1024) + 10]);
+    } finally {
+      if (outF3 != null) {
+        outF3.close();
+      }
+    }
+    localFs.createNewFile(new Path(subDir, "file4.txt"));
+  }
+
   @Test (timeout = 300000)
   public void testSleepJob() throws Exception {
     testSleepJobInternal(false);
@@ -199,16 +238,99 @@ public class TestMRJobs {
     testSleepJobInternal(true);
   }
 
+  @Test(timeout = 300000)
+  public void testSleepJobWithLocalResourceUnderLimit() throws Exception {
+    Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    // set limits to well above what is expected
+    sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6);
+    sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6);
+    setupJobResourceDirs();
+    sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
+    testSleepJobInternal(sleepConf, false, true, null);
+  }
+
+  @Test(timeout = 300000)
+  public void testSleepJobWithLocalResourceSizeOverLimit() throws Exception {
+    Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    // set limits to well below what is expected
+    sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 1);
+    setupJobResourceDirs();
+    sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
+    testSleepJobInternal(sleepConf, false, false,
+        ResourceViolation.TOTAL_RESOURCE_SIZE);
+  }
+
+  @Test(timeout = 300000)
+  public void testSleepJobWithLocalResourceNumberOverLimit() throws Exception {
+    Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    // set limits to well below what is expected
+    sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 1);
+    setupJobResourceDirs();
+    sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
+    testSleepJobInternal(sleepConf, false, false,
+        ResourceViolation.NUMBER_OF_RESOURCES);
+  }
+
+  @Test(timeout = 300000)
+  public void testSleepJobWithLocalResourceCheckAndRemoteJar()
+      throws Exception {
+    Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    // set limits to well above what is expected
+    sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6);
+    sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6);
+    setupJobResourceDirs();
+    sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
+    testSleepJobInternal(sleepConf, true, true, null);
+  }
+
+  @Test(timeout = 300000)
+  public void testSleepJobWithLocalIndividualResourceOverLimit()
+      throws Exception {
+    Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    // set limits to well below what is expected
+    sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 1);
+    setupJobResourceDirs();
+    sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
+    testSleepJobInternal(sleepConf, false, false,
+        ResourceViolation.SINGLE_RESOURCE_SIZE);
+  }
+
+  @Test(timeout = 300000)
+  public void testSleepJobWithLocalIndividualResourceUnderLimit()
+      throws Exception {
+    Configuration sleepConf = new Configuration(mrCluster.getConfig());
+    // set limits to well below what is expected
+    sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 2);
+    setupJobResourceDirs();
+    sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString());
+    testSleepJobInternal(sleepConf, false, true, null);
+  }
+
   private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
+    testSleepJobInternal(new Configuration(mrCluster.getConfig()),
+        useRemoteJar, true, null);
+  }
+
+  private enum ResourceViolation {
+    NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE;
+  }
+
+  private void testSleepJobInternal(Configuration sleepConf,
+      boolean useRemoteJar, boolean jobSubmissionShouldSucceed,
+      ResourceViolation violation) throws Exception {
     LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);
 
+    if (!jobSubmissionShouldSucceed && violation == null) {
+      Assert.fail("Test is misconfigured. jobSubmissionShouldSucceed is set"
+          + " to false and a ResourceViolation is not specified.");
+    }
+
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
                + " not found. Not running test.");
       return;
     }
 
-    Configuration sleepConf = new Configuration(mrCluster.getConfig());
     // set master address to local to test that local mode applied iff 
framework == local
     sleepConf.set(MRConfig.MASTER_ADDRESS, "local");   
     
@@ -229,7 +351,45 @@ public class TestMRJobs {
       job.setJarByClass(SleepJob.class);
     }
     job.setMaxMapAttempts(1); // speed up failures
-    job.submit();
+    try {
+      job.submit();
+      Assert.assertTrue("JobSubmission succeeded when it should have failed.",
+          jobSubmissionShouldSucceed);
+    } catch (IOException e) {
+      if (jobSubmissionShouldSucceed) {
+        Assert
+            .fail("Job submission failed when it should have succeeded: " + e);
+      }
+      switch (violation) {
+      case NUMBER_OF_RESOURCES:
+        if (!e.getMessage().contains(
+            "This job has exceeded the maximum number of"
+                + " submitted resources")) {
+          Assert.fail("Test failed unexpectedly: " + e);
+        }
+        break;
+
+      case TOTAL_RESOURCE_SIZE:
+        if (!e.getMessage().contains(
+            "This job has exceeded the maximum size of submitted resources")) {
+          Assert.fail("Test failed unexpectedly: " + e);
+        }
+        break;
+
+      case SINGLE_RESOURCE_SIZE:
+        if (!e.getMessage().contains(
+            "This job has exceeded the maximum size of a single submitted")) {
+          Assert.fail("Test failed unexpectedly: " + e);
+        }
+        break;
+
+      default:
+        Assert.fail("Test failed unexpectedly: " + e);
+        break;
+      }
+      // we are done with the test (job submission failed)
+      return;
+    }
     String trackingUrl = job.getTrackingURL();
     String jobId = job.getJobID().toString();
     boolean succeeded = job.waitForCompletion(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to