This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 53a530aa88c3a513de34fa579b1814a5ac5dcbd1
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Sat Jan 8 20:48:10 2022 +0530

    MAPREDUCE-7371. DistributedCache alternative APIs should not use 
DistributedCache APIs internally (#3855)
    
    Contributed by Viraj Jasani
---
 .../mapred/LocalDistributedCacheManager.java       |   5 +-
 .../apache/hadoop/mapreduce/v2/util/MRApps.java    |  25 ++--
 .../mapred/TestLocalDistributedCacheManager.java   |   7 +-
 .../hadoop/mapred/TestMRWithDistributedCache.java  |  44 +++----
 .../hadoop/mapreduce/v2/util/TestMRApps.java       |  13 +-
 .../apache/hadoop/mapred/pipes/Application.java    |   4 +-
 .../org/apache/hadoop/mapred/pipes/Submitter.java  |   7 +-
 .../main/java/org/apache/hadoop/mapreduce/Job.java | 121 ++++++++++++++++--
 .../hadoop/mapreduce/JobResourceUploader.java      |  13 +-
 .../org/apache/hadoop/mapreduce/JobSubmitter.java  |   2 +-
 .../filecache/ClientDistributedCacheManager.java   |  13 +-
 .../mapreduce/filecache/DistributedCache.java      |  96 +++-----------
 .../hadoop/mapreduce/task/JobContextImpl.java      | 138 +++++++++++++++++++--
 .../java/org/apache/hadoop/mapred/MRCaching.java   |  17 +--
 .../hadoop/mapreduce/v2/TestMRAppWithCombiner.java |   4 +-
 .../org/apache/hadoop/streaming/StreamJob.java     |  11 +-
 16 files changed, 344 insertions(+), 176 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
index 8f17ffda422..e5a50f013ff 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -95,13 +94,13 @@ class LocalDistributedCacheManager {
 
     // Find which resources are to be put on the local classpath
     Map<String, Path> classpaths = new HashMap<String, Path>();
-    Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
+    Path[] archiveClassPaths = JobContextImpl.getArchiveClassPaths(conf);
     if (archiveClassPaths != null) {
       for (Path p : archiveClassPaths) {
         classpaths.put(p.toUri().getPath().toString(), p);
       }
     }
-    Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
+    Path[] fileClassPaths = JobContextImpl.getFileClassPaths(conf);
     if (fileClassPaths != null) {
       for (Path p : fileClassPaths) {
         classpaths.put(p.toUri().getPath().toString(), p);
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
index 41a8458ed3f..6db56c6a3e8 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -272,12 +273,12 @@ public class MRApps extends Apps {
         crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", 
conf);
     // a * in the classpath will only find a .jar, so we need to filter out
     // all .jars and add everything else
-    addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
-        DistributedCache.getCacheFiles(conf),
+    addToClasspathIfNotJar(JobContextImpl.getFileClassPaths(conf),
+        JobContextImpl.getCacheFiles(conf),
         conf,
         environment, classpathEnvVar);
-    addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
-        DistributedCache.getCacheArchives(conf),
+    addToClasspathIfNotJar(JobContextImpl.getArchiveClassPaths(conf),
+        JobContextImpl.getCacheArchives(conf),
         conf,
         environment, classpathEnvVar);
     if (userClassesTakesPrecedence) {
@@ -483,8 +484,8 @@ public class MRApps extends Apps {
 
     // Cache archives
     lrb.setType(LocalResourceType.ARCHIVE);
-    lrb.setUris(DistributedCache.getCacheArchives(conf));
-    lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf));
+    lrb.setUris(JobContextImpl.getCacheArchives(conf));
+    lrb.setTimestamps(JobContextImpl.getArchiveTimestamps(conf));
     lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES));
     lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf));
     lrb.setSharedCacheUploadPolicies(
@@ -493,8 +494,8 @@ public class MRApps extends Apps {
     
     // Cache files
     lrb.setType(LocalResourceType.FILE);
-    lrb.setUris(DistributedCache.getCacheFiles(conf));
-    lrb.setTimestamps(DistributedCache.getFileTimestamps(conf));
+    lrb.setUris(JobContextImpl.getCacheFiles(conf));
+    lrb.setTimestamps(JobContextImpl.getFileTimestamps(conf));
     lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES));
     lrb.setVisibilities(DistributedCache.getFileVisibilities(conf));
     lrb.setSharedCacheUploadPolicies(
@@ -504,9 +505,9 @@ public class MRApps extends Apps {
 
   /**
    * Set up the DistributedCache related configs to make
-   * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+   * {@link JobContextImpl#getLocalCacheFiles(Configuration)}
    * and
-   * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+   * {@link JobContextImpl#getLocalCacheArchives(Configuration)}
    * working.
    * @param conf
    * @throws java.io.IOException
@@ -518,7 +519,7 @@ public class MRApps extends Apps {
     //        ^ ^ all symlinks are created in the current work-dir
 
     // Update the configuration object with localized archives.
-    URI[] cacheArchives = DistributedCache.getCacheArchives(conf);
+    URI[] cacheArchives = JobContextImpl.getCacheArchives(conf);
     if (cacheArchives != null) {
       List<String> localArchives = new ArrayList<String>();
       for (int i = 0; i < cacheArchives.length; ++i) {
@@ -538,7 +539,7 @@ public class MRApps extends Apps {
     }
 
     // Update the configuration object with localized files.
-    URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
+    URI[] cacheFiles = JobContextImpl.getCacheFiles(conf);
     if (cacheFiles != null) {
       List<String> localFiles = new ArrayList<String>();
       for (int i = 0; i < cacheFiles.length; ++i) {
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
index b565d23029b..5d1c669e500 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -174,7 +173,7 @@ public class TestLocalDistributedCacheManager {
       }
     });
 
-    DistributedCache.addCacheFile(file, conf);
+    Job.addCacheFile(file, conf);
     Map<String, Boolean> policies = new HashMap<String, Boolean>();
     policies.put(file.toString(), true);
     Job.setFileSharedCacheUploadPolicies(conf, policies);
@@ -286,8 +285,8 @@ public class TestLocalDistributedCacheManager {
       }
     });
 
-    DistributedCache.addCacheFile(file, conf);
-    DistributedCache.addCacheFile(file, conf);
+    Job.addCacheFile(file, conf);
+    Job.addCacheFile(file, conf);
     Map<String, Boolean> policies = new HashMap<String, Boolean>();
     policies.put(file.toString(), true);
     Job.setFileSharedCacheUploadPolicies(conf, policies);
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
index 2f0a6bdc009..62b8815e6f1 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
@@ -229,60 +229,60 @@ public class TestMRWithDistributedCache {
     Assert.assertEquals("Test Local Archives 1",
         conf.get(DistributedCache.CACHE_LOCALARCHIVES));
     Assert.assertEquals(1,
-        DistributedCache.getLocalCacheArchives(conf).length);
+        JobContextImpl.getLocalCacheArchives(conf).length);
     Assert.assertEquals("Test Local Archives 1",
-        DistributedCache.getLocalCacheArchives(conf)[0].getName());
+        JobContextImpl.getLocalCacheArchives(conf)[0].getName());
     DistributedCache.addLocalArchives(conf, "Test Local Archives 2");
     Assert.assertEquals("Test Local Archives 1,Test Local Archives 2",
         conf.get(DistributedCache.CACHE_LOCALARCHIVES));
     Assert.assertEquals(2,
-        DistributedCache.getLocalCacheArchives(conf).length);
+        JobContextImpl.getLocalCacheArchives(conf).length);
     Assert.assertEquals("Test Local Archives 2",
-        DistributedCache.getLocalCacheArchives(conf)[1].getName());
+        JobContextImpl.getLocalCacheArchives(conf)[1].getName());
     DistributedCache.setLocalArchives(conf, "Test Local Archives 3");
     Assert.assertEquals("Test Local Archives 3",
         conf.get(DistributedCache.CACHE_LOCALARCHIVES));
     Assert.assertEquals(1,
-        DistributedCache.getLocalCacheArchives(conf).length);
+        JobContextImpl.getLocalCacheArchives(conf).length);
     Assert.assertEquals("Test Local Archives 3",
-        DistributedCache.getLocalCacheArchives(conf)[0].getName());
+        JobContextImpl.getLocalCacheArchives(conf)[0].getName());
 
     DistributedCache.addLocalFiles(conf, "Test Local Files 1");
     Assert.assertEquals("Test Local Files 1",
         conf.get(DistributedCache.CACHE_LOCALFILES));
     Assert.assertEquals(1,
-        DistributedCache.getLocalCacheFiles(conf).length);
+        JobContextImpl.getLocalCacheFiles(conf).length);
     Assert.assertEquals("Test Local Files 1",
-        DistributedCache.getLocalCacheFiles(conf)[0].getName());
+        JobContextImpl.getLocalCacheFiles(conf)[0].getName());
     DistributedCache.addLocalFiles(conf, "Test Local Files 2");
     Assert.assertEquals("Test Local Files 1,Test Local Files 2",
         conf.get(DistributedCache.CACHE_LOCALFILES));
     Assert.assertEquals(2,
-        DistributedCache.getLocalCacheFiles(conf).length);
+        JobContextImpl.getLocalCacheFiles(conf).length);
     Assert.assertEquals("Test Local Files 2",
-        DistributedCache.getLocalCacheFiles(conf)[1].getName());
+        JobContextImpl.getLocalCacheFiles(conf)[1].getName());
     DistributedCache.setLocalFiles(conf, "Test Local Files 3");
     Assert.assertEquals("Test Local Files 3",
         conf.get(DistributedCache.CACHE_LOCALFILES));
     Assert.assertEquals(1,
-        DistributedCache.getLocalCacheFiles(conf).length);
+        JobContextImpl.getLocalCacheFiles(conf).length);
     Assert.assertEquals("Test Local Files 3",
-        DistributedCache.getLocalCacheFiles(conf)[0].getName());
+        JobContextImpl.getLocalCacheFiles(conf)[0].getName());
 
     DistributedCache.setArchiveTimestamps(conf, "1234567890");
     Assert.assertEquals(1234567890,
         conf.getLong(DistributedCache.CACHE_ARCHIVES_TIMESTAMPS, 0));
     Assert.assertEquals(1,
-        DistributedCache.getArchiveTimestamps(conf).length);
+        JobContextImpl.getArchiveTimestamps(conf).length);
     Assert.assertEquals(1234567890,
-        DistributedCache.getArchiveTimestamps(conf)[0]);
+        JobContextImpl.getArchiveTimestamps(conf)[0]);
     DistributedCache.setFileTimestamps(conf, "1234567890");
     Assert.assertEquals(1234567890,
         conf.getLong(DistributedCache.CACHE_FILES_TIMESTAMPS, 0));
     Assert.assertEquals(1,
-        DistributedCache.getFileTimestamps(conf).length);
+        JobContextImpl.getFileTimestamps(conf).length);
     Assert.assertEquals(1234567890,
-        DistributedCache.getFileTimestamps(conf)[0]);
+        JobContextImpl.getFileTimestamps(conf)[0]);
 
     DistributedCache.createAllSymlink(conf, new File("Test Job Cache Dir"),
         new File("Test Work Dir"));
@@ -297,18 +297,18 @@ public class TestMRWithDistributedCache {
         DistributedCache.getTimestamp(conf, symlinkFile.toURI()));
     Assert.assertTrue(symlinkFile.delete());
 
-    DistributedCache.addCacheArchive(symlinkFile.toURI(), conf);
+    Job.addCacheArchive(symlinkFile.toURI(), conf);
     Assert.assertEquals(symlinkFile.toURI().toString(),
         conf.get(DistributedCache.CACHE_ARCHIVES));
-    Assert.assertEquals(1, DistributedCache.getCacheArchives(conf).length);
+    Assert.assertEquals(1, JobContextImpl.getCacheArchives(conf).length);
     Assert.assertEquals(symlinkFile.toURI(),
-        DistributedCache.getCacheArchives(conf)[0]);
+        JobContextImpl.getCacheArchives(conf)[0]);
 
-    DistributedCache.addCacheFile(symlinkFile.toURI(), conf);
+    Job.addCacheFile(symlinkFile.toURI(), conf);
     Assert.assertEquals(symlinkFile.toURI().toString(),
         conf.get(DistributedCache.CACHE_FILES));
-    Assert.assertEquals(1, DistributedCache.getCacheFiles(conf).length);
+    Assert.assertEquals(1, JobContextImpl.getCacheFiles(conf).length);
     Assert.assertEquals(symlinkFile.toURI(),
-        DistributedCache.getCacheFiles(conf)[0]);
+        JobContextImpl.getCacheFiles(conf)[0]);
   }
 }
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
index c6a287439da..62235b72673 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@@ -377,11 +376,11 @@ public class TestMRApps {
     when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
     when(mockFs.resolvePath(filePath)).thenReturn(filePath);
     
-    DistributedCache.addCacheArchive(archive, conf);
+    Job.addCacheArchive(archive, conf);
     conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
     conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
     conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
-    DistributedCache.addCacheFile(file, conf);
+    Job.addCacheFile(file, conf);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
@@ -416,8 +415,8 @@ public class TestMRApps {
     when(mockFs.resolvePath(filePath)).thenReturn(filePath);
     when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
     
-    DistributedCache.addCacheFile(file, conf);
-    DistributedCache.addCacheFile(file2, conf);
+    Job.addCacheFile(file, conf);
+    Job.addCacheFile(file2, conf);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
@@ -452,11 +451,11 @@ public class TestMRApps {
     when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
     when(mockFs.resolvePath(filePath)).thenReturn(filePath);
     
-    DistributedCache.addCacheArchive(archive, conf);
+    Job.addCacheArchive(archive, conf);
     conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
     conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
     conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
-    DistributedCache.addCacheFile(file, conf);
+    Job.addCacheFile(file, conf);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
index 5416d269368..d56963d0b74 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
@@ -48,11 +48,11 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.security.token.Token;
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -117,7 +117,7 @@ class Application<K1 extends WritableComparable, V1 extends 
Writable,
     if (interpretor != null) {
       cmd.add(interpretor);
     }
-    String executable = 
DistributedCache.getLocalCacheFiles(conf)[0].toString();
+    String executable = JobContextImpl.getLocalCacheFiles(conf)[0].toString();
     if (!FileUtil.canExecute(new File(executable))) {
       // LinuxTaskController sets +x permissions on all distcache files 
already.
       // In case of DefaultTaskController, set permissions here.
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
index 64448c1c460..a9f42dd1667 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
@@ -54,8 +54,9 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.mapred.lib.LazyOutputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
@@ -319,7 +320,7 @@ public class Submitter extends Configured implements Tool {
       setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
       setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
     }
-    URI[] fileCache = DistributedCache.getCacheFiles(conf);
+    URI[] fileCache = JobContextImpl.getCacheFiles(conf);
     if (fileCache == null) {
       fileCache = new URI[1];
     } else {
@@ -334,7 +335,7 @@ public class Submitter extends Configured implements Tool {
       ie.initCause(e);
       throw ie;
     }
-    DistributedCache.setCacheFiles(fileCache, conf);
+    Job.setCacheFiles(fileCache, conf);
   }
 
   /**
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index c9fe2c377f0..e0143da7c9a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -1121,7 +1121,18 @@ public class Job extends JobContextImpl implements 
JobContext, AutoCloseable {
    */
   public void setCacheArchives(URI[] archives) {
     ensureState(JobState.DEFINE);
-    DistributedCache.setCacheArchives(archives, conf);
+    setCacheArchives(archives, conf);
+  }
+
+  /**
+   * Set the configuration with the given set of archives.
+   *
+   * @param archives The list of archives that need to be localized.
+   * @param conf Configuration which will be changed.
+   */
+  public static void setCacheArchives(URI[] archives, Configuration conf) {
+    String cacheArchives = StringUtils.uriToString(archives);
+    conf.set(MRJobConfig.CACHE_ARCHIVES, cacheArchives);
   }
 
   /**
@@ -1130,7 +1141,18 @@ public class Job extends JobContextImpl implements 
JobContext, AutoCloseable {
    */
   public void setCacheFiles(URI[] files) {
     ensureState(JobState.DEFINE);
-    DistributedCache.setCacheFiles(files, conf);
+    setCacheFiles(files, conf);
+  }
+
+  /**
+   * Set the configuration with the given set of files.
+   *
+   * @param files The list of files that need to be localized.
+   * @param conf Configuration which will be changed.
+   */
+  public static void setCacheFiles(URI[] files, Configuration conf) {
+    String cacheFiles = StringUtils.uriToString(files);
+    conf.set(MRJobConfig.CACHE_FILES, cacheFiles);
   }
 
   /**
@@ -1139,16 +1161,53 @@ public class Job extends JobContextImpl implements 
JobContext, AutoCloseable {
    */
   public void addCacheArchive(URI uri) {
     ensureState(JobState.DEFINE);
-    DistributedCache.addCacheArchive(uri, conf);
+    addCacheArchive(uri, conf);
   }
-  
+
+  /**
+   * Add an archives to be localized to the conf.
+   *
+   * @param uri  The uri of the cache to be localized.
+   * @param conf Configuration to add the cache to.
+   */
+  public static void addCacheArchive(URI uri, Configuration conf) {
+    String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
+    conf.set(MRJobConfig.CACHE_ARCHIVES,
+        archives == null ? uri.toString() : archives + "," + uri.toString());
+  }
+
   /**
    * Add a file to be localized
    * @param uri The uri of the cache to be localized
    */
   public void addCacheFile(URI uri) {
     ensureState(JobState.DEFINE);
-    DistributedCache.addCacheFile(uri, conf);
+    addCacheFile(uri, conf);
+  }
+
+  /**
+   * Add a file to be localized to the conf. The localized file will be
+   * downloaded to the execution node(s), and a link will be created to the
+   * file from the job's working directory. If the last part of URI's path name
+   * is "*", then the entire parent directory will be localized and links
+   * will be created from the job's working directory to each file in the
+   * parent directory.
+   * <p>
+   * The access permissions of the file will determine whether the localized
+   * file will be shared across jobs. If the file is not readable by other or
+   * if any of its parent directories is not executable by other, then the
+   * file will not be shared. In the case of a path that ends in "/*",
+   * sharing of the localized files will be determined solely from the
+   * access permissions of the parent directories. The access permissions of
+   * the individual files will be ignored.
+   *
+   * @param uri  The uri of the cache to be localized.
+   * @param conf Configuration to add the cache to.
+   */
+  public static void addCacheFile(URI uri, Configuration conf) {
+    String files = conf.get(MRJobConfig.CACHE_FILES);
+    conf.set(MRJobConfig.CACHE_FILES,
+        files == null ? uri.toString() : files + "," + uri.toString());
   }
 
   /**
@@ -1165,7 +1224,39 @@ public class Job extends JobContextImpl implements 
JobContext, AutoCloseable {
   public void addFileToClassPath(Path file)
     throws IOException {
     ensureState(JobState.DEFINE);
-    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
+    addFileToClassPath(file, conf, file.getFileSystem(conf));
+  }
+
+  /**
+   * Add a file path to the current set of classpath entries. The file will
+   * also be added to the cache.
+   *
+   * @param file Path of the file to be added.
+   * @param conf Configuration that contains the classpath setting.
+   * @param fs FileSystem with respect to which {@code file} should be 
interpreted.
+   */
+  public static void addFileToClassPath(Path file, Configuration conf, 
FileSystem fs) {
+    addFileToClassPath(file, conf, fs, true);
+  }
+
+  /**
+   * Add a file path to the current set of classpath entries. The file will
+   * also be added to the cache if {@code addToCache} is true.
+   *
+   * @param file Path of the file to be added.
+   * @param conf Configuration that contains the classpath setting.
+   * @param fs FileSystem with respect to which {@code file} should be 
interpreted.
+   * @param addToCache Whether the file should also be added to the cache list.
+   */
+  public static void addFileToClassPath(Path file, Configuration conf, 
FileSystem fs,
+      boolean addToCache) {
+    String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
+    conf.set(MRJobConfig.CLASSPATH_FILES,
+        classpath == null ? file.toString() : classpath + "," + 
file.toString());
+    if (addToCache) {
+      URI uri = fs.makeQualified(file).toUri();
+      Job.addCacheFile(uri, conf);
+    }
   }
 
   /**
@@ -1180,7 +1271,23 @@ public class Job extends JobContextImpl implements 
JobContext, AutoCloseable {
   public void addArchiveToClassPath(Path archive)
     throws IOException {
     ensureState(JobState.DEFINE);
-    DistributedCache.addArchiveToClassPath(archive, conf, 
archive.getFileSystem(conf));
+    addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
+  }
+
+  /**
+   * Add an archive path to the current set of classpath entries. It adds the
+   * archive to cache as well.
+   *
+   * @param archive Path of the archive to be added.
+   * @param conf Configuration that contains the classpath setting.
+   * @param fs FileSystem with respect to which {@code archive} should be 
interpreted.
+   */
+  public static void addArchiveToClassPath(Path archive, Configuration conf, 
FileSystem fs) {
+    String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES);
+    conf.set(MRJobConfig.CLASSPATH_ARCHIVES,
+        classpath == null ? archive.toString() : classpath + "," + 
archive.toString());
+    URI uri = fs.makeQualified(archive).toUri();
+    Job.addCacheArchive(uri, conf);
   }
 
   /**
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
index a7cdf34d2ac..d26354913a6 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java
@@ -332,13 +332,12 @@ class JobResourceUploader {
           // separately.
           foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
         }
-        DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf,
-            jtFs, false);
+        Job.addFileToClassPath(new Path(newURI.getPath()), conf, jtFs, false);
         if (fromSharedCache) {
           // We simply add this URI to the distributed cache. It will not come
           // from the staging directory (it is in the shared cache), so we
           // must add it to the cache regardless of the wildcard feature.
-          DistributedCache.addCacheFile(newURI, conf);
+          Job.addCacheFile(newURI, conf);
         } else {
           libjarURIs.add(newURI);
         }
@@ -352,10 +351,10 @@ class JobResourceUploader {
         // Add the whole directory to the cache using a wild card
         Path libJarsDirWildcard =
             jtFs.makeQualified(new Path(libjarsDir, 
DistributedCache.WILDCARD));
-        DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
+        Job.addCacheFile(libJarsDirWildcard.toUri(), conf);
       } else {
         for (URI uri : libjarURIs) {
-          DistributedCache.addCacheFile(uri, conf);
+          Job.addCacheFile(uri, conf);
         }
       }
     }
@@ -847,8 +846,8 @@ class JobResourceUploader {
       }
       Path tmp = new Path(tmpURI);
       Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
-      DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()),
-          conf);
+      Path path = new Path(newPath.toUri().getPath());
+      Job.addFileToClassPath(path, conf, path.getFileSystem(conf));
     }
   }
 
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index dd6c9220464..4c983178a7f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -466,7 +466,7 @@ class JobSubmitter {
         throw new IllegalArgumentException(e);
       }
 
-      DistributedCache.addCacheArchive(uri, conf);
+      Job.addCacheArchive(uri, conf);
     }
   }
 }
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
index ada14db944c..5e20bbe40dc 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.security.Credentials;
 
 /**
@@ -82,7 +83,7 @@ public class ClientDistributedCacheManager {
    */
   public static void determineTimestamps(Configuration job,
       Map<URI, FileStatus> statCache) throws IOException {
-    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    URI[] tarchives = JobContextImpl.getCacheArchives(job);
     if (tarchives != null) {
       FileStatus status = getFileStatus(job, tarchives[0], statCache);
       StringBuilder archiveFileSizes =
@@ -100,7 +101,7 @@ public class ClientDistributedCacheManager {
       setArchiveTimestamps(job, archiveTimestamps.toString());
     }
   
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    URI[] tfiles = JobContextImpl.getCacheFiles(job);
     if (tfiles != null) {
       FileStatus status = getFileStatus(job, tfiles[0], statCache);
       StringBuilder fileSizes =
@@ -127,8 +128,8 @@ public class ClientDistributedCacheManager {
    */
   public static void getDelegationTokens(Configuration job,
       Credentials credentials) throws IOException {
-    URI[] tarchives = DistributedCache.getCacheArchives(job);
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    URI[] tarchives = JobContextImpl.getCacheArchives(job);
+    URI[] tfiles = JobContextImpl.getCacheFiles(job);
     
     int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? 
tfiles.length :0);
     Path[] ps = new Path[size];
@@ -159,7 +160,7 @@ public class ClientDistributedCacheManager {
    */
   public static void determineCacheVisibilities(Configuration job,
       Map<URI, FileStatus> statCache) throws IOException {
-    URI[] tarchives = DistributedCache.getCacheArchives(job);
+    URI[] tarchives = JobContextImpl.getCacheArchives(job);
     if (tarchives != null) {
       StringBuilder archiveVisibilities =
         new StringBuilder(String.valueOf(isPublic(job, tarchives[0], 
statCache)));
@@ -169,7 +170,7 @@ public class ClientDistributedCacheManager {
       }
       setArchiveVisibilities(job, archiveVisibilities.toString());
     }
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
+    URI[] tfiles = JobContextImpl.getCacheFiles(job);
     if (tfiles != null) {
       StringBuilder fileVisibilities =
         new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache)));
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
index 532e3ad3d7c..dd11d490ab4 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
@@ -23,6 +23,7 @@ import java.util.*;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
@@ -145,8 +146,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static void setCacheArchives(URI[] archives, Configuration conf) {
-    String sarchives = StringUtils.uriToString(archives);
-    conf.set(MRJobConfig.CACHE_ARCHIVES, sarchives);
+    Job.setCacheArchives(archives, conf);
   }
 
   /**
@@ -159,8 +159,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static void setCacheFiles(URI[] files, Configuration conf) {
-    String sfiles = StringUtils.uriToString(files);
-    conf.set(MRJobConfig.CACHE_FILES, sfiles);
+    Job.setCacheFiles(files, conf);
   }
 
   /**
@@ -174,7 +173,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static URI[] getCacheArchives(Configuration conf) throws IOException {
-    return 
StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_ARCHIVES));
+    return JobContextImpl.getCacheArchives(conf);
   }
 
   /**
@@ -188,7 +187,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
-    return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_FILES));
+    return JobContextImpl.getCacheFiles(conf);
   }
 
   /**
@@ -201,10 +200,8 @@ public class DistributedCache {
    * @see JobContext#getLocalCacheArchives()
    */
   @Deprecated
-  public static Path[] getLocalCacheArchives(Configuration conf)
-    throws IOException {
-    return StringUtils.stringToPath(conf
-                                    
.getStrings(MRJobConfig.CACHE_LOCALARCHIVES));
+  public static Path[] getLocalCacheArchives(Configuration conf) throws 
IOException {
+    return JobContextImpl.getLocalCacheArchives(conf);
   }
 
   /**
@@ -219,23 +216,7 @@ public class DistributedCache {
   @Deprecated
   public static Path[] getLocalCacheFiles(Configuration conf)
     throws IOException {
-    return 
StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES));
-  }
-
-  /**
-   * Parse a list of strings into longs.
-   * @param strs the list of strings to parse
-   * @return a list of longs that were parsed. same length as strs.
-   */
-  private static long[] parseTimestamps(String[] strs) {
-    if (strs == null) {
-      return null;
-    }
-    long[] result = new long[strs.length];
-    for(int i=0; i < strs.length; ++i) {
-      result[i] = Long.parseLong(strs[i]);
-    }
-    return result;
+    return JobContextImpl.getLocalCacheFiles(conf);
   }
 
   /**
@@ -248,8 +229,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static long[] getArchiveTimestamps(Configuration conf) {
-    return parseTimestamps(
-        conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
+    return JobContextImpl.getArchiveTimestamps(conf);
   }
 
 
@@ -263,8 +243,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static long[] getFileTimestamps(Configuration conf) {
-    return parseTimestamps(
-        conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
+    return JobContextImpl.getFileTimestamps(conf);
   }
 
   /**
@@ -277,9 +256,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static void addCacheArchive(URI uri, Configuration conf) {
-    String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
-    conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
-             : archives + "," + uri.toString());
+    Job.addCacheArchive(uri, conf);
   }
 
   /**
@@ -307,9 +284,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static void addCacheFile(URI uri, Configuration conf) {
-    String files = conf.get(MRJobConfig.CACHE_FILES);
-    conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() : files + 
","
-             + uri.toString());
+    Job.addCacheFile(uri, conf);
   }
 
   /**
@@ -323,9 +298,8 @@ public class DistributedCache {
    * @see Job#addFileToClassPath(Path)
    */
   @Deprecated
-  public static void addFileToClassPath(Path file, Configuration conf)
-    throws IOException {
-         addFileToClassPath(file, conf, file.getFileSystem(conf));
+  public static void addFileToClassPath(Path file, Configuration conf) throws 
IOException {
+    Job.addFileToClassPath(file, conf, file.getFileSystem(conf));
   }
 
   /**
@@ -340,7 +314,7 @@ public class DistributedCache {
    */
   public static void addFileToClassPath(Path file, Configuration conf,
       FileSystem fs) {
-    addFileToClassPath(file, conf, fs, true);
+    Job.addFileToClassPath(file, conf, fs, true);
   }
 
   /**
@@ -357,14 +331,7 @@ public class DistributedCache {
    */
   public static void addFileToClassPath(Path file, Configuration conf,
       FileSystem fs, boolean addToCache) {
-    String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
-    conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
-             : classpath + "," + file.toString());
-
-    if (addToCache) {
-      URI uri = fs.makeQualified(file).toUri();
-      addCacheFile(uri, conf);
-    }
+    Job.addFileToClassPath(file, conf, fs, addToCache);
   }
 
   /**
@@ -377,16 +344,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static Path[] getFileClassPaths(Configuration conf) {
-    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
-                                MRJobConfig.CLASSPATH_FILES);
-    if (list.size() == 0) {
-      return null;
-    }
-    Path[] paths = new Path[list.size()];
-    for (int i = 0; i < list.size(); i++) {
-      paths[i] = new Path(list.get(i));
-    }
-    return paths;
+    return JobContextImpl.getFileClassPaths(conf);
   }
 
   /**
@@ -401,7 +359,7 @@ public class DistributedCache {
   @Deprecated
   public static void addArchiveToClassPath(Path archive, Configuration conf)
     throws IOException {
-    addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
+    Job.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
   }
 
   /**
@@ -415,12 +373,7 @@ public class DistributedCache {
   public static void addArchiveToClassPath
          (Path archive, Configuration conf, FileSystem fs)
       throws IOException {
-    String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES);
-    conf.set(MRJobConfig.CLASSPATH_ARCHIVES, classpath == null ? archive
-             .toString() : classpath + "," + archive.toString());
-    URI uri = fs.makeQualified(archive).toUri();
-
-    addCacheArchive(uri, conf);
+    Job.addArchiveToClassPath(archive, conf, fs);
   }
 
   /**
@@ -433,16 +386,7 @@ public class DistributedCache {
    */
   @Deprecated
   public static Path[] getArchiveClassPaths(Configuration conf) {
-    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
-                                MRJobConfig.CLASSPATH_ARCHIVES);
-    if (list.size() == 0) {
-      return null;
-    }
-    Path[] paths = new Path[list.size()];
-    for (int i = 0; i < list.size(); i++) {
-      paths[i] = new Path(list.get(i));
-    }
-    return paths;
+    return JobContextImpl.getArchiveClassPaths(conf);
   }
 
   /**
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
index 1696246b843..41d03bb0027 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.task;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -43,6 +44,7 @@ import 
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A read-only view of the job that is provided to the tasks while they
@@ -305,7 +307,27 @@ public class JobContextImpl implements JobContext {
    * Get the archive entries in classpath as an array of Path
    */
   public Path[] getArchiveClassPaths() {
-    return DistributedCache.getArchiveClassPaths(conf);
+    return getArchiveClassPaths(conf);
+  }
+
+  /**
+   * Get the archive entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
+   *
+   * @param conf Configuration that contains the classpath setting.
+   * @return An array of Path consisting of archive entries in classpath.
+   */
+  public static Path[] getArchiveClassPaths(Configuration conf) {
+    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+        MRJobConfig.CLASSPATH_ARCHIVES);
+    if (list.size() == 0) {
+      return null;
+    }
+    Path[] paths = new Path[list.size()];
+    for (int i = 0; i < list.size(); i++) {
+      paths[i] = new Path(list.get(i));
+    }
+    return paths;
   }
 
   /**
@@ -314,7 +336,18 @@ public class JobContextImpl implements JobContext {
    * @throws IOException
    */
   public URI[] getCacheArchives() throws IOException {
-    return DistributedCache.getCacheArchives(conf);
+    return getCacheArchives(conf);
+  }
+
+  /**
+   * Get cache archives set in the Configuration. Used by
+   * internal DistributedCache and JobContextImpl code.
+   *
+   * @param conf The configuration which contains the archives.
+   * @return A URI array of the caches set in the Configuration.
+   */
+  public static URI[] getCacheArchives(Configuration conf) {
+    return 
StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_ARCHIVES));
   }
 
   /**
@@ -324,7 +357,18 @@ public class JobContextImpl implements JobContext {
    */
 
   public URI[] getCacheFiles() throws IOException {
-    return DistributedCache.getCacheFiles(conf);
+    return getCacheFiles(conf);
+  }
+
+  /**
+   * Get cache files set in the Configuration. Used by internal
+   * DistributedCache and MapReduce code.
+   *
+   * @param conf The configuration which contains the files.
+   * @return A URI array of the files set in the Configuration.
+   */
+  public static URI[] getCacheFiles(Configuration conf) {
+    return StringUtils.stringToURI(conf.getStrings(MRJobConfig.CACHE_FILES));
   }
 
   /**
@@ -334,7 +378,17 @@ public class JobContextImpl implements JobContext {
    */
   public Path[] getLocalCacheArchives()
     throws IOException {
-    return DistributedCache.getLocalCacheArchives(conf);
+    return getLocalCacheArchives(conf);
+  }
+
+  /**
+   * Return the path array of the localized caches.
+   *
+   * @param conf Configuration that contains the localized archives.
+   * @return A path array of localized caches.
+   */
+  public static Path[] getLocalCacheArchives(Configuration conf) {
+    return 
StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALARCHIVES));
   }
 
   /**
@@ -344,14 +398,82 @@ public class JobContextImpl implements JobContext {
    */
   public Path[] getLocalCacheFiles()
     throws IOException {
-    return DistributedCache.getLocalCacheFiles(conf);
+    return getLocalCacheFiles(conf);
+  }
+
+  /**
+   * Return the path array of the localized files.
+   *
+   * @param conf Configuration that contains the localized files.
+   * @return A path array of localized files.
+   */
+  public static Path[] getLocalCacheFiles(Configuration conf) {
+    return 
StringUtils.stringToPath(conf.getStrings(MRJobConfig.CACHE_LOCALFILES));
+  }
+
+  /**
+   * Parse a list of strings into longs.
+   * @param strs the list of strings to parse
+   * @return a list of longs that were parsed. same length as strs.
+   */
+  private static long[] parseTimestamps(String[] strs) {
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+
+  /**
+   * Get the timestamps of the archives. Used by internal
+   * DistributedCache and MapReduce code.
+   *
+   * @param conf The configuration which stored the timestamps.
+   * @return a long array of timestamps.
+   */
+  public static long[] getArchiveTimestamps(Configuration conf) {
+    return 
parseTimestamps(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
+  }
+
+  /**
+   * Get the timestamps of the files. Used by internal
+   * DistributedCache and MapReduce code.
+   *
+   * @param conf The configuration which stored the timestamps.
+   * @return a long array of timestamps.
+   */
+  public static long[] getFileTimestamps(Configuration conf) {
+    return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
   }
 
   /**
    * Get the file entries in classpath as an array of Path
    */
   public Path[] getFileClassPaths() {
-    return DistributedCache.getFileClassPaths(conf);
+    return getFileClassPaths(conf);
+  }
+
+  /**
+   * Get the file entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
+   *
+   * @param conf Configuration that contains the classpath setting.
+   * @return Array of Path consisting of file entries in the classpath.
+   */
+  public static Path[] getFileClassPaths(Configuration conf) {
+    ArrayList<String> list =
+        (ArrayList<String>) 
conf.getStringCollection(MRJobConfig.CLASSPATH_FILES);
+    if (list.size() == 0) {
+      return null;
+    }
+    Path[] paths = new Path[list.size()];
+    for (int i = 0; i < list.size(); i++) {
+      paths[i] = new Path(list.get(i));
+    }
+    return paths;
   }
 
   /**
@@ -376,7 +498,7 @@ public class JobContextImpl implements JobContext {
    * @return a string array of timestamps 
    */
   public String[] getArchiveTimestamps() {
-    return toTimestampStrs(DistributedCache.getArchiveTimestamps(conf));
+    return toTimestampStrs(getArchiveTimestamps(conf));
   }
 
   /**
@@ -385,7 +507,7 @@ public class JobContextImpl implements JobContext {
    * @return a string array of timestamps 
    */
   public String[] getFileTimestamps() {
-    return toTimestampStrs(DistributedCache.getFileTimestamps(conf));
+    return toTimestampStrs(getFileTimestamps(conf));
   }
 
   /** 
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java
index 718dbee0144..a71550bce8a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MRCaching.java
@@ -26,15 +26,8 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.*;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 import java.net.URI;
@@ -62,8 +55,8 @@ public class MRCaching {
     public void configure(JobConf jconf) {
       conf = jconf;
       try {
-        Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
-        Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+        Path[] localArchives = JobContextImpl.getLocalCacheArchives(conf);
+        Path[] localFiles = JobContextImpl.getLocalCacheFiles(conf);
         // read the cached files (unzipped, unjarred and text)
         // and put it into a single file TEST_ROOT_DIR/test.txt
         String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
@@ -254,7 +247,7 @@ public class MRCaching {
     uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
     uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
     uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
-    DistributedCache.addCacheFile(uris[0], conf);
+    Job.addCacheFile(uris[0], conf);
 
     // Save expected file sizes
     long[] fileSizes = new long[1];
@@ -262,7 +255,7 @@ public class MRCaching {
 
     long[] archiveSizes = new long[5]; // track last 5
     for (int i = 1; i < 6; i++) {
-      DistributedCache.addCacheArchive(uris[i], conf);
+      Job.addCacheArchive(uris[i], conf);
       archiveSizes[i-1] = // starting with second archive
         fs.getFileStatus(new Path(uris[i].getPath())).getLen();
     }
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
index 0f3916824ae..10cb9b85b5b 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRAppWithCombiner.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.Job;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -111,7 +111,7 @@ public class TestMRAppWithCombiner {
     conf.setCombinerClass(MyCombinerToCheckReporter.class);
     //conf.setJarByClass(MyCombinerToCheckReporter.class);
     conf.setReducerClass(IdentityReducer.class);
-    DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
+    Job.addFileToClassPath(TestMRJobs.APP_JAR, conf, 
TestMRJobs.APP_JAR.getFileSystem(conf));
     conf.setOutputCommitter(CustomOutputCommitter.class);
     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputKeyClass(LongWritable.class);
diff --git 
a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java
 
b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java
index 551a5f2dc40..4f9d820c124 100644
--- 
a/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java
+++ 
b/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -969,10 +970,12 @@ public class StreamJob implements Tool {
         fail(LINK_URI);
     }
     // set the jobconf for the caching parameters
-    if (cacheArchives != null)
-      DistributedCache.setCacheArchives(archiveURIs, jobConf_);
-    if (cacheFiles != null)
-      DistributedCache.setCacheFiles(fileURIs, jobConf_);
+    if (cacheArchives != null) {
+      Job.setCacheArchives(archiveURIs, jobConf_);
+    }
+    if (cacheFiles != null) {
+      Job.setCacheFiles(fileURIs, jobConf_);
+    }
 
     if (verbose_) {
       listJobConfProperties();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to