This is an automated email from the ASF dual-hosted git repository. liuml07 pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new be42149 MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache. (#2319) be42149 is described below commit be421490dadd61c4bcbd23bdb23bed408607ff22 Author: zz <zz.git...@gmail.com> AuthorDate: Tue Sep 22 11:57:36 2020 -0700 MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache. (#2319) Contributed by Zhenzhao Wang <zhenzhaow...@gmail.com> Signed-off-by: Mingliang Liu <lium...@apache.org> --- .../hadoop/mapreduce/v2/app/job/impl/JobImpl.java | 3 +- .../mapreduce/v2/app/job/impl/TestJobImpl.java | 23 ++++++++++++ .../main/java/org/apache/hadoop/mapreduce/Job.java | 41 ++++++++++++---------- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4995120..b688f4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1421,7 +1421,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, * be set up to false. In that way, the NMs that host the task containers * won't try to upload the resources to shared cache. */ - private static void cleanupSharedCacheUploadPolicies(Configuration conf) { + @VisibleForTesting + static void cleanupSharedCacheUploadPolicies(Configuration conf) { Map<String, Boolean> emap = Collections.emptyMap(); Job.setArchiveSharedCacheUploadPolicies(conf, emap); Job.setFileSharedCacheUploadPolicies(conf, emap); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 1827ce4..d342a3f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -989,6 +990,28 @@ public class TestJobImpl { Assert.assertEquals(updatedPriority, jobPriority); } + @Test + public void testCleanupSharedCacheUploadPolicies() { + Configuration config = new Configuration(); + Map<String, Boolean> archivePolicies = new HashMap<>(); + archivePolicies.put("archive1", true); + archivePolicies.put("archive2", true); + Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies); + Map<String, Boolean> filePolicies = new HashMap<>(); + filePolicies.put("file1", true); + filePolicies.put("jar1", true); + Job.setFileSharedCacheUploadPolicies(config, filePolicies); + Assert.assertEquals( + 2, Job.getArchiveSharedCacheUploadPolicies(config).size()); + Assert.assertEquals( + 2, Job.getFileSharedCacheUploadPolicies(config).size()); + JobImpl.cleanupSharedCacheUploadPolicies(config); + Assert.assertEquals( + 0, Job.getArchiveSharedCacheUploadPolicies(config).size()); + Assert.assertEquals( + 0, Job.getFileSharedCacheUploadPolicies(config).size()); + } + private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { final SystemClock clock = SystemClock.getInstance(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 493a221..c276ec0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -1448,26 +1448,29 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable { */ private static void setSharedCacheUploadPolicies(Configuration conf, Map<String, Boolean> policies, boolean areFiles) { - if (policies != null) { - StringBuilder sb = new StringBuilder(); - Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator(); - Map.Entry<String, Boolean> e; - if (it.hasNext()) { - e = it.next(); - sb.append(e.getKey() + DELIM + e.getValue()); - } else { - // policies is an empty map, just skip setting the parameter - return; - } - while (it.hasNext()) { - e = it.next(); - sb.append("," + e.getKey() + DELIM + e.getValue()); - } - String confParam = - areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES - : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; - conf.set(confParam, sb.toString()); + String confParam = areFiles ? + MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES : + MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; + // If no policy is provided, we will reset the config by setting an empty + // string value. In other words, cleaning up existing policies. This is + // useful when we try to clean up shared cache upload policies for + // non-application master tasks. See MAPREDUCE-7294 for details. + if (policies == null || policies.size() == 0) { + conf.set(confParam, ""); + return; + } + StringBuilder sb = new StringBuilder(); + Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator(); + Map.Entry<String, Boolean> e; + if (it.hasNext()) { + e = it.next(); + sb.append(e.getKey() + DELIM + e.getValue()); + } + while (it.hasNext()) { + e = it.next(); + sb.append("," + e.getKey() + DELIM + e.getValue()); } + conf.set(confParam, sb.toString()); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org