MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public API in DistCp. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bab6209c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bab6209c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bab6209c Branch: refs/heads/HDFS-7285 Commit: bab6209c170d1127680f8d0e975e2e54e9c63ccc Parents: ff1b358 Author: Vinod Kumar Vavilapalli <vino...@apache.org> Authored: Tue Mar 3 16:28:22 2015 -0800 Committer: Jing Zhao <ji...@apache.org> Committed: Mon Mar 9 13:11:23 2015 -0700 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../java/org/apache/hadoop/tools/DistCp.java | 47 +++++++++++++++----- 2 files changed, 39 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab6209c/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7a2eff3..b2ae9d9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -320,6 +320,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-5612. Add javadoc for TaskCompletionEvent.Status. (Chris Palmer via aajisaka) + MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public + API in DistCp. (Jing Zhao via vinodkv) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item http://git-wip-us.apache.org/repos/asf/hadoop/blob/bab6209c/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 28535a7..b80aeb8 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -20,6 +20,8 @@ package org.apache.hadoop.tools; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -51,12 +53,14 @@ import com.google.common.annotations.VisibleForTesting; * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune * behaviour. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public class DistCp extends Configured implements Tool { /** - * Priority of the ResourceManager shutdown hook. + * Priority of the shutdown hook. */ - public static final int SHUTDOWN_HOOK_PRIORITY = 30; + static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final Log LOG = LogFactory.getLog(DistCp.class); @@ -66,7 +70,7 @@ public class DistCp extends Configured implements Tool { private static final String PREFIX = "_distcp"; private static final String WIP_PREFIX = "._WIP_"; private static final String DISTCP_DEFAULT_XML = "distcp-default.xml"; - public static final Random rand = new Random(); + static final Random rand = new Random(); private boolean submitted; private FileSystem jobFS; @@ -90,7 +94,7 @@ public class DistCp extends Configured implements Tool { * To be used with the ToolRunner. Not for public consumption. */ @VisibleForTesting - public DistCp() {} + DistCp() {} /** * Implementation of Tool::run(). Orchestrates the copy of source file(s) @@ -100,6 +104,7 @@ public class DistCp extends Configured implements Tool { * @param argv List of arguments passed to DistCp, from the ToolRunner. * @return On success, it returns 0. Else, -1. */ + @Override public int run(String[] argv) { if (argv.length < 1) { OptionsParser.usage(); @@ -145,9 +150,21 @@ public class DistCp extends Configured implements Tool { * @throws Exception */ public Job execute() throws Exception { + Job job = createAndSubmitJob(); + + if (inputOptions.shouldBlock()) { + waitForJobCompletion(job); + } + return job; + } + + /** + * Create and submit the mapreduce job. + * @return The mapreduce job object that has been submitted + */ + public Job createAndSubmitJob() throws Exception { assert inputOptions != null; assert getConf() != null; - Job job = null; try { synchronized(this) { @@ -169,16 +186,24 @@ public class DistCp extends Configured implements Tool { String jobID = job.getJobID().toString(); job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); - LOG.info("DistCp job-id: " + jobID); - if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) { - throw new IOException("DistCp failure: Job " + jobID + " has failed: " - + job.getStatus().getFailureInfo()); - } + return job; } /** + * Wait for the given job to complete. + * @param job the given mapreduce job that has already been submitted + */ + public void waitForJobCompletion(Job job) throws Exception { + assert job != null; + if (!job.waitForCompletion(true)) { + throw new IOException("DistCp failure: Job " + job.getJobID() + + " has failed: " + job.getStatus().getFailureInfo()); + } + } + + /** * Set targetPathExists in both inputOptions and job config, * for the benefit of CopyCommitter */ @@ -436,7 +461,7 @@ public class DistCp extends Configured implements Tool { private static class Cleanup implements Runnable { private final DistCp distCp; - public Cleanup(DistCp distCp) { + Cleanup(DistCp distCp) { this.distCp = distCp; }