Author: szetszwo Date: Thu May 9 22:46:39 2013 New Revision: 1480824 URL: http://svn.apache.org/r1480824 Log: Merge r1480440 through r1480820 from trunk.
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1480440-1480820 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Thu May 9 22:46:39 2013 @@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv) + MAPREDUCE-5157. Bring back old sampler related code so that we can support + binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method @@ -383,7 +386,19 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5204. Handling YarnRemoteException separately from IOException in MR app after YARN-629. (Xuan Gong via vinodkv) -Release 2.0.4-alpha - UNRELEASED + MAPREDUCE-5209. Fix units in a ShuffleScheduler log message. + (Tsuyoshi OZAWA via cdouglas) + + MAPREDUCE-5212. Handling YarnRemoteException separately from IOException in + MR App's use of ClientRMProtocol after YARN-631. (Xuan Gong via vinodkv) + + MAPREDUCE-5226. Handling YarnRemoteException separately from IOException in + MR App's use of AMRMProtocol after YARN-630. (Xuan Gong via vinodkv) + + MAPREDUCE-4942. mapreduce.Job has a bunch of methods that throw + InterruptedException so its incompatible with MR1. (rkanter via tucu) + +Release 2.0.4-alpha - 2013-04-25 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1480440-1480820 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1480440-1480820 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu May 9 22:46:39 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -144,7 +145,8 @@ public abstract class RMContainerRequest LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); } - protected AllocateResponse makeRemoteRequest() throws YarnRemoteException { + protected AllocateResponse makeRemoteRequest() throws YarnRemoteException, + IOException { AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( applicationAttemptId, lastResponseID, super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>( Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Thu May 9 22:46:39 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -202,7 +203,7 @@ public class MRAppBenchmark { public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) - throws YarnRemoteException { + throws YarnRemoteException, IOException { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMinimumResourceCapability(BuilderUtils @@ -215,7 +216,7 @@ public class MRAppBenchmark { @Override public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) - throws YarnRemoteException { + throws YarnRemoteException, IOException { FinishApplicationMasterResponse response = Records.newRecord(FinishApplicationMasterResponse.class); return response; @@ -223,7 +224,7 @@ public class MRAppBenchmark { @Override public AllocateResponse allocate(AllocateRequest request) - throws YarnRemoteException { + throws YarnRemoteException, IOException { AllocateResponse response = Records.newRecord(AllocateResponse.class); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java Thu May 9 22:46:39 2013 @@ -100,6 +100,7 @@ public class TestLocalContainerAllocator when(scheduler.allocate(isA(AllocateRequest.class))) .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail"))); } catch (YarnRemoteException e) { + } catch (IOException e) { } return scheduler; } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Thu May 9 22:46:39 2013 @@ -209,11 +209,7 @@ public class JobClient extends CLI { * completed. */ public float mapProgress() throws IOException { - try { - return job.mapProgress(); - } catch (InterruptedException ie) { - throw new IOException(ie); - } + return job.mapProgress(); } /** @@ -221,11 +217,7 @@ public class JobClient extends CLI { * completed. */ public float reduceProgress() throws IOException { - try { - return job.reduceProgress(); - } catch (InterruptedException ie) { - throw new IOException(ie); - } + return job.reduceProgress(); } /** @@ -245,33 +237,21 @@ public class JobClient extends CLI { * completed. */ public float setupProgress() throws IOException { - try { - return job.setupProgress(); - } catch (InterruptedException ie) { - throw new IOException(ie); - } + return job.setupProgress(); } /** * Returns immediately whether the whole job is done yet or not. */ public synchronized boolean isComplete() throws IOException { - try { - return job.isComplete(); - } catch (InterruptedException ie) { - throw new IOException(ie); - } + return job.isComplete(); } /** * True iff job completed successfully. */ public synchronized boolean isSuccessful() throws IOException { - try { - return job.isSuccessful(); - } catch (InterruptedException ie) { - throw new IOException(ie); - } + return job.isSuccessful(); } /** @@ -302,11 +282,7 @@ public class JobClient extends CLI { * Tells the service to terminate the current job. */ public synchronized void killJob() throws IOException { - try { - job.killJob(); - } catch (InterruptedException ie) { - throw new IOException(ie); - } + job.killJob(); } @@ -331,14 +307,10 @@ public class JobClient extends CLI { */ public synchronized void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException { - try { - if (shouldFail) { - job.failTask(taskId); - } else { - job.killTask(taskId); - } - } catch (InterruptedException ie) { - throw new IOException(ie); + if (shouldFail) { + job.failTask(taskId); + } else { + job.killTask(taskId); } } @@ -378,16 +350,12 @@ public class JobClient extends CLI { * Returns the counters for this job */ public Counters getCounters() throws IOException { - try { - Counters result = null; - org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); - if(temp != null) { - result = Counters.downgrade(temp); - } - return result; - } catch (InterruptedException ie) { - throw new IOException(ie); + Counters result = null; + org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); + if(temp != null) { + result = Counters.downgrade(temp); } + return result; } @Override Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java Thu May 9 22:46:39 2013 @@ -19,10 +19,18 @@ package org.apache.hadoop.mapred.lib; import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; @InterfaceAudience.Public @@ -30,6 +38,8 @@ import org.apache.hadoop.mapreduce.Job; public class InputSampler<K,V> extends org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> { + private static final Log LOG = LogFactory.getLog(InputSampler.class); + public InputSampler(JobConf conf) { super(conf); } @@ -38,4 +48,219 @@ public class InputSampler<K,V> extends throws IOException, ClassNotFoundException, InterruptedException { writePartitionFile(new Job(job), sampler); } + /** + * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}. + */ + public interface Sampler<K,V> extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.Sampler<K, V> { + /** + * For a given job, collect and return a subset of the keys from the + * input data. + */ + K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException; + } + + /** + * Samples the first n records from s splits. + * Inexpensive way to sample random data. + */ + public static class SplitSampler<K,V> extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.SplitSampler<K, V> + implements Sampler<K,V> { + + /** + * Create a SplitSampler sampling <em>all</em> splits. + * Takes the first numSamples / numSplits records from each split. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public SplitSampler(int numSamples) { + this(numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new SplitSampler. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public SplitSampler(int numSamples, int maxSplitsSampled) { + super(numSamples, maxSplitsSampled); + } + + /** + * From each split sampled, take the first numSamples / numSplits records. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { + InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); + ArrayList<K> samples = new ArrayList<K>(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.length); + int splitStep = splits.length / splitsToSample; + int samplesPerSplit = numSamples / splitsToSample; + long records = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], + job, Reporter.NULL); + K key = reader.createKey(); + V value = reader.createValue(); + while (reader.next(key, value)) { + samples.add(key); + key = reader.createKey(); + ++records; + if ((i+1) * samplesPerSplit <= records) { + break; + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from random points in the input. + * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from + * each split. + */ + public static class RandomSampler<K,V> extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler<K, V> + implements Sampler<K,V> { + + /** + * Create a new RandomSampler sampling <em>all</em> splits. + * This will read every split at the client, which is very expensive. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public RandomSampler(double freq, int numSamples) { + this(freq, numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new RandomSampler. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public RandomSampler(double freq, int numSamples, int maxSplitsSampled) { + super(freq, numSamples, maxSplitsSampled); + } + + /** + * Randomize the split order, then take the specified number of keys from + * each split sampled, where each key is selected with the specified + * probability and possibly replaced by a subsequently selected key when + * the quota of keys from that split is satisfied. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { + InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); + ArrayList<K> samples = new ArrayList<K>(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.length); + + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + LOG.debug("seed: " + seed); + // shuffle splits + for (int i = 0; i < splits.length; ++i) { + InputSplit tmp = splits[i]; + int j = r.nextInt(splits.length); + splits[i] = splits[j]; + splits[j] = tmp; + } + // our target rate is in terms of the maximum number of sample splits, + // but we accept the possibility of sampling additional splits to hit + // the target sample keyset + for (int i = 0; i < splitsToSample || + (i < splits.length && samples.size() < numSamples); ++i) { + RecordReader<K,V> reader = inf.getRecordReader(splits[i], job, + Reporter.NULL); + K key = reader.createKey(); + V value = reader.createValue(); + while (reader.next(key, value)) { + if (r.nextDouble() <= freq) { + if (samples.size() < numSamples) { + samples.add(key); + } else { + // When exceeding the maximum number of samples, replace a + // random element with this one, then adjust the frequency + // to reflect the possibility of existing elements being + // pushed out + int ind = r.nextInt(numSamples); + if (ind != numSamples) { + samples.set(ind, key); + } + freq *= (numSamples - 1) / (double) numSamples; + } + key = reader.createKey(); + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from s splits at regular intervals. + * Useful for sorted data. + */ + public static class IntervalSampler<K,V> extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.IntervalSampler<K, V> + implements Sampler<K,V> { + + /** + * Create a new IntervalSampler sampling <em>all</em> splits. + * @param freq The frequency with which records will be emitted. + */ + public IntervalSampler(double freq) { + this(freq, Integer.MAX_VALUE); + } + + /** + * Create a new IntervalSampler. + * @param freq The frequency with which records will be emitted. + * @param maxSplitsSampled The maximum number of splits to examine. + * @see #getSample + */ + public IntervalSampler(double freq, int maxSplitsSampled) { + super(freq, maxSplitsSampled); + } + + /** + * For each split sampled, emit when the ratio of the number of records + * retained to the total record count is less than the specified + * frequency. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException { + InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); + ArrayList<K> samples = new ArrayList<K>(); + int splitsToSample = Math.min(maxSplitsSampled, splits.length); + int splitStep = splits.length / splitsToSample; + long records = 0; + long kept = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], + job, Reporter.NULL); + K key = reader.createKey(); + V value = reader.createValue(); + while (reader.next(key, value)) { + ++records; + if ((double) kept / records < freq) { + ++kept; + samples.add(key); + key = reader.createKey(); + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Thu May 9 22:46:39 2013 @@ -296,7 +296,7 @@ public class Job extends JobContextImpl * it, if necessary */ synchronized void ensureFreshStatus() - throws IOException, InterruptedException { + throws IOException { if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { updateStatus(); } @@ -306,13 +306,18 @@ public class Job extends JobContextImpl * immediately * @throws IOException */ - synchronized void updateStatus() throws IOException, InterruptedException { - this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { - @Override - public JobStatus run() throws IOException, InterruptedException { - return cluster.getClient().getJobStatus(status.getJobID()); - } - }); + synchronized void updateStatus() throws IOException { + try { + this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { + @Override + public JobStatus run() throws IOException, InterruptedException { + return cluster.getClient().getJobStatus(status.getJobID()); + } + }); + } + catch (InterruptedException ie) { + throw new IOException(ie); + } if (this.status == null) { throw new IOException("Job status not available "); } @@ -537,7 +542,7 @@ public class Job extends JobContextImpl * @return the progress of the job's map-tasks. * @throws IOException */ - public float mapProgress() throws IOException, InterruptedException { + public float mapProgress() throws IOException { ensureState(JobState.RUNNING); ensureFreshStatus(); return status.getMapProgress(); @@ -550,7 +555,7 @@ public class Job extends JobContextImpl * @return the progress of the job's reduce-tasks. * @throws IOException */ - public float reduceProgress() throws IOException, InterruptedException { + public float reduceProgress() throws IOException { ensureState(JobState.RUNNING); ensureFreshStatus(); return status.getReduceProgress(); @@ -576,7 +581,7 @@ public class Job extends JobContextImpl * @return the progress of the job's setup-tasks. * @throws IOException */ - public float setupProgress() throws IOException, InterruptedException { + public float setupProgress() throws IOException { ensureState(JobState.RUNNING); ensureFreshStatus(); return status.getSetupProgress(); @@ -589,7 +594,7 @@ public class Job extends JobContextImpl * @return <code>true</code> if the job is complete, else <code>false</code>. * @throws IOException */ - public boolean isComplete() throws IOException, InterruptedException { + public boolean isComplete() throws IOException { ensureState(JobState.RUNNING); updateStatus(); return status.isJobComplete(); @@ -601,7 +606,7 @@ public class Job extends JobContextImpl * @return <code>true</code> if the job succeeded, else <code>false</code>. * @throws IOException */ - public boolean isSuccessful() throws IOException, InterruptedException { + public boolean isSuccessful() throws IOException { ensureState(JobState.RUNNING); updateStatus(); return status.getState() == JobStatus.State.SUCCEEDED; @@ -613,9 +618,14 @@ public class Job extends JobContextImpl * * @throws IOException */ - public void killJob() throws IOException, InterruptedException { + public void killJob() throws IOException { ensureState(JobState.RUNNING); - cluster.getClient().killJob(getJobID()); + try { + cluster.getClient().killJob(getJobID()); + } + catch (InterruptedException ie) { + throw new IOException(ie); + } } /** @@ -673,7 +683,7 @@ public class Job extends JobContextImpl try { return getTaskCompletionEvents(startFrom, 10); } catch (InterruptedException ie) { - throw new RuntimeException(ie); + throw new IOException(ie); } } @@ -684,13 +694,18 @@ public class Job extends JobContextImpl * @throws IOException */ public boolean killTask(final TaskAttemptID taskId) - throws IOException, InterruptedException { + throws IOException { ensureState(JobState.RUNNING); - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { - public Boolean run() throws IOException, InterruptedException { - return cluster.getClient().killTask(taskId, false); - } - }); + try { + return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + public Boolean run() throws IOException, InterruptedException { + return cluster.getClient().killTask(taskId, false); + } + }); + } + catch (InterruptedException ie) { + throw new IOException(ie); + } } /** @@ -700,14 +715,19 @@ public class Job extends JobContextImpl * @throws IOException */ public boolean failTask(final TaskAttemptID taskId) - throws IOException, InterruptedException { + throws IOException { ensureState(JobState.RUNNING); - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { - @Override - public Boolean run() throws IOException, InterruptedException { - return cluster.getClient().killTask(taskId, true); - } - }); + try { + return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws IOException, InterruptedException { + return cluster.getClient().killTask(taskId, true); + } + }); + } + catch (InterruptedException ie) { + throw new IOException(ie); + } } /** @@ -718,14 +738,19 @@ public class Job extends JobContextImpl * @throws IOException */ public Counters getCounters() - throws IOException, InterruptedException { + throws IOException { ensureState(JobState.RUNNING); - return ugi.doAs(new PrivilegedExceptionAction<Counters>() { - @Override - public Counters run() throws IOException, InterruptedException { - return cluster.getClient().getJobCounters(getJobID()); - } - }); + try { + return ugi.doAs(new PrivilegedExceptionAction<Counters>() { + @Override + public Counters run() throws IOException, InterruptedException { + return cluster.getClient().getJobCounters(getJobID()); + } + }); + } + catch (InterruptedException ie) { + throw new IOException(ie); + } } /** Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Thu May 9 22:46:39 2013 @@ -96,8 +96,8 @@ public class InputSampler<K,V> extends C */ public static class SplitSampler<K,V> implements Sampler<K,V> { - private final int numSamples; - private final int maxSplitsSampled; + protected final int numSamples; + protected final int maxSplitsSampled; /** * Create a SplitSampler sampling <em>all</em> splits. @@ -157,9 +157,9 @@ public class InputSampler<K,V> extends C * each split. */ public static class RandomSampler<K,V> implements Sampler<K,V> { - private double freq; - private final int numSamples; - private final int maxSplitsSampled; + protected double freq; + protected final int numSamples; + protected final int maxSplitsSampled; /** * Create a new RandomSampler sampling <em>all</em> splits. @@ -249,8 +249,8 @@ public class InputSampler<K,V> extends C * Useful for sorted data. */ public static class IntervalSampler<K,V> implements Sampler<K,V> { - private final double freq; - private final int maxSplitsSampled; + protected final double freq; + protected final int maxSplitsSampled; /** * Create a new IntervalSampler sampling <em>all</em> splits. Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Thu May 9 22:46:39 2013 @@ -359,7 +359,7 @@ class ShuffleScheduler<K,V> { } } LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + - (System.currentTimeMillis()-shuffleStart.get()) + "s"); + (System.currentTimeMillis()-shuffleStart.get()) + "ms"); } public synchronized void resetKnownMaps() { Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1480440-1480820 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu May 9 22:46:39 2013 @@ -137,7 +137,7 @@ public class ClientServiceDelegate { } } - private MRClientProtocol getProxy() throws YarnRemoteException { + private MRClientProtocol getProxy() throws YarnRemoteException, IOException { if (realProxy != null) { return realProxy; } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu May 9 22:46:39 2013 @@ -362,7 +362,7 @@ public class TestClientServiceDelegate { } private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf, - int noOfRetries) throws YarnRemoteException { + int noOfRetries) throws YarnRemoteException, IOException { conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, !isAMReachableFromClient); @@ -429,7 +429,8 @@ public class TestClientServiceDelegate { "N/A", 0.0f); } - private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException { + private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException, + IOException { ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null); return rm; Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=1480824&r1=1480823&r2=1480824&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Thu May 9 22:46:39 2013 @@ -17,23 +17,26 @@ */ package org.apache.hadoop.mapreduce.lib.partition; +import static org.junit.Assert.assertEquals; + +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.junit.Test; -import static org.junit.Assert.*; - import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.Test; public class TestInputSampler { @@ -47,6 +50,24 @@ public class TestInputSampler { public int getInit() { return i; } } + static class MapredSequentialSplit implements org.apache.hadoop.mapred.InputSplit { + private int i; + MapredSequentialSplit(int i) { + this.i = i; + } + @Override + public long getLength() { return 0; } + @Override + public String[] getLocations() { return new String[0]; } + public int getInit() { return i; } + @Override + public void write(DataOutput out) throws IOException { + } + @Override + public void readFields(DataInput in) throws IOException { + } + } + static class TestInputSamplerIF extends InputFormat<IntWritable,NullWritable> { @@ -90,6 +111,71 @@ public class TestInputSampler { } + static class TestMapredInputSamplerIF extends TestInputSamplerIF implements + org.apache.hadoop.mapred.InputFormat<IntWritable,NullWritable> { + + TestMapredInputSamplerIF(int maxDepth, int numSplits, int... splitInit) { + super(maxDepth, numSplits, splitInit); + } + + @Override + public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, + int numSplits) throws IOException { + List<InputSplit> splits = null; + try { + splits = getSplits(Job.getInstance(job)); + } catch (InterruptedException e) { + throw new IOException(e); + } + org.apache.hadoop.mapred.InputSplit[] retVals = + new org.apache.hadoop.mapred.InputSplit[splits.size()]; + for (int i = 0; i < splits.size(); ++i) { + MapredSequentialSplit split = new MapredSequentialSplit( + ((SequentialSplit) splits.get(i)).getInit()); + retVals[i] = split; + } + return retVals; + } + + @Override + public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable> + getRecordReader(final org.apache.hadoop.mapred.InputSplit split, + JobConf job, Reporter reporter) throws IOException { + return new org.apache.hadoop.mapred.RecordReader + <IntWritable, NullWritable>() { + private final IntWritable i = + new IntWritable(((MapredSequentialSplit)split).getInit()); + private int maxVal = i.get() + maxDepth + 1; + + @Override + public boolean next(IntWritable key, NullWritable value) + throws IOException { + i.set(i.get() + 1); + return i.get() < maxVal; + } + @Override + public IntWritable createKey() { + return new IntWritable(i.get()); + } + @Override + public NullWritable createValue() { + return NullWritable.get(); + } + @Override + public long getPos() throws IOException { + return 0; + } + @Override + public void close() throws IOException { + } + @Override + public float getProgress() throws IOException { + return 0; + } + }; + } + } + /** * Verify SplitSampler contract, that an equal number of records are taken * from the first splits. @@ -119,6 +205,36 @@ public class TestInputSampler { } /** + * Verify SplitSampler contract in mapred.lib.InputSampler, which is added + * back for binary compatibility of M/R 1.x + */ + @Test (timeout = 30000) + @SuppressWarnings("unchecked") // IntWritable comparator not typesafe + public void testMapredSplitSampler() throws Exception { + final int TOT_SPLITS = 15; + final int NUM_SPLITS = 5; + final int STEP_SAMPLE = 5; + final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE; + org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable> + sampler = new org.apache.hadoop.mapred.lib.InputSampler.SplitSampler + <IntWritable,NullWritable>(NUM_SAMPLES, NUM_SPLITS); + int inits[] = new int[TOT_SPLITS]; + for (int i = 0; i < TOT_SPLITS; ++i) { + inits[i] = i * STEP_SAMPLE; + } + Object[] samples = sampler.getSample( + new TestMapredInputSamplerIF(100000, TOT_SPLITS, inits), + new JobConf()); + assertEquals(NUM_SAMPLES, samples.length); + Arrays.sort(samples, new IntWritable.Comparator()); + for (int i = 0; i < NUM_SAMPLES; ++i) { + // mapred.lib.InputSampler.SplitSampler has a sampling step + assertEquals(i % STEP_SAMPLE + TOT_SPLITS * (i / STEP_SAMPLE), + ((IntWritable)samples[i]).get()); + } + } + + /** * Verify IntervalSampler contract, that samples are taken at regular * intervals from the given splits. */ @@ -146,4 +262,33 @@ public class TestInputSampler { } } + /** + * Verify IntervalSampler in mapred.lib.InputSampler, which is added back + * for binary compatibility of M/R 1.x + */ + @Test (timeout = 30000) + @SuppressWarnings("unchecked") // IntWritable comparator not typesafe + public void testMapredIntervalSampler() throws Exception { + final int TOT_SPLITS = 16; + final int PER_SPLIT_SAMPLE = 4; + final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE; + final double FREQ = 1.0 / TOT_SPLITS; + org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable> + sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler + <IntWritable,NullWritable>(FREQ, NUM_SAMPLES); + int inits[] = new int[TOT_SPLITS]; + for (int i = 0; i < TOT_SPLITS; ++i) { + inits[i] = i; + } + Job ignored = Job.getInstance(); + Object[] samples = sampler.getSample(new TestInputSamplerIF( + NUM_SAMPLES, TOT_SPLITS, inits), ignored); + assertEquals(NUM_SAMPLES, samples.length); + Arrays.sort(samples, new IntWritable.Comparator()); + for (int i = 0; i < NUM_SAMPLES; ++i) { + assertEquals(i, + ((IntWritable)samples[i]).get()); + } + } + }