Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Sat Jul 12 02:24:40 2014 @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapred.LocatedFileStatusFetcher; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -359,6 +360,15 @@ public abstract class FileInputFormat<K, String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** * Generate the list of files and make them into FileSplits. @@ -392,17 +402,20 @@ public abstract class FileInputFormat<K, while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable - splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); + splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), + blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files
Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Sat Jul 12 02:24:40 2014 @@ -22,11 +22,13 @@ import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -41,6 +43,7 @@ public class FileSplit extends InputSpli private long start; private long length; private String[] hosts; + private SplitLocationInfo[] hostInfos; public FileSplit() {} @@ -57,6 +60,31 @@ public class FileSplit extends InputSpli this.length = length; this.hosts = hosts; } + + /** Constructs a split with host and cached-blocks information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + this(file, start, length, hosts); + hostInfos = new SplitLocationInfo[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + // because N will be tiny, scanning is probably faster than a HashSet + boolean inMemory = false; + for (String inMemoryHost : inMemoryHosts) { + if (inMemoryHost.equals(hosts[i])) { + inMemory = true; + break; + } + } + hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory); + } + } /** The file containing this split's data. */ public Path getPath() { return file; } @@ -98,4 +126,10 @@ public class FileSplit extends InputSpli return this.hosts; } } + + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return hostInfos; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java Sat Jul 12 02:24:40 2014 @@ -38,6 +38,21 @@ public class HostUtil { httpPort + "/tasklog?attemptid=" + taskAttemptID); } + /** + * Always throws {@link RuntimeException} because this method is not + * supposed to be called at runtime. This method is only for keeping + * binary compatibility with Hive 0.13. MAPREDUCE-5830 for the details. + * @deprecated Use {@link #getTaskLogUrl(String, String, String, String)} + * to construct the taskLogUrl. + */ + @Deprecated + public static String getTaskLogUrl(String taskTrackerHostName, + String httpPort, String taskAttemptID) { + throw new RuntimeException( + "This method is not supposed to be called at runtime. " + + "Use HostUtil.getTaskLogUrl(String, String, String, String) instead."); + } + public static String convertTrackerNameToHostName(String trackerName) { // Ugly! // Convert the trackerName to its host name Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Sat Jul 12 02:24:40 2014 @@ -83,6 +83,16 @@ </property> <property> + <name>mapreduce.job.reducer.preempt.delay.sec</name> + <value>0</value> + <description>The threshold in terms of seconds after which an unsatisfied mapper + request triggers reducer preemption to free space. Default 0 implies that the + reduces should be preempted immediately after allocation if there is currently no + room for newly allocated mappers. + </description> +</property> + +<property> <name>mapreduce.job.max.split.locations</name> <value>10</value> <description>The max number of block locations to store for each split for Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1603348-1605891 Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Sat Jul 12 02:24:40 2014 @@ -103,6 +103,29 @@ public class TestFileInputFormat { } @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1); + String[] locations = splits[0].getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits[0].getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } + + @Test public void testListStatusSimple() throws IOException { Configuration conf = new Configuration(); conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); @@ -223,8 +246,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Sat Jul 12 02:24:40 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.After; @@ -139,6 +140,28 @@ public class TestFileInputFormat { 1, mockFs.numListLocatedStatusCalls); FileSystem.closeAll(); } + + @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + Job job = Job.getInstance(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + List<InputSplit> splits = fileInputFormat.getSplits(job); + String[] locations = splits.get(0).getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } @Test public void testListStatusSimple() throws IOException { @@ -402,9 +425,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; - } + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f, Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java Sat Jul 12 02:24:40 2014 @@ -372,10 +372,13 @@ public class TestFixedLengthInputFormat format.getRecordReader(split, job, voidReporter); LongWritable key = reader.createKey(); BytesWritable value = reader.createValue(); - while (reader.next(key, value)) { - result.add(new String(value.getBytes(), 0, value.getLength())); + try { + while (reader.next(key, value)) { + result.add(new String(value.getBytes(), 0, value.getLength())); + } + } finally { + reader.close(); } - reader.close(); return result; } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Sat Jul 12 02:24:40 2014 @@ -183,6 +183,8 @@ public class TestPipeApplication { output.setWriter(wr); conf.set(Submitter.PRESERVE_COMMANDFILE, "true"); + initStdOut(conf); + Application<WritableComparable<IntWritable>, Writable, IntWritable, Text> application = new Application<WritableComparable<IntWritable>, Writable, IntWritable, Text>( conf, rReader, output, reporter, IntWritable.class, Text.class); application.getDownlink().flush(); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java Sat Jul 12 02:24:40 2014 @@ -417,15 +417,18 @@ public class TestFixedLengthInputFormat new MapContextImpl<LongWritable, BytesWritable, LongWritable, BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); - reader.initialize(split, mcontext); LongWritable key; BytesWritable value; - while (reader.nextKeyValue()) { - key = reader.getCurrentKey(); - value = reader.getCurrentValue(); - result.add(new String(value.getBytes(), 0, value.getLength())); + try { + reader.initialize(split, mcontext); + while (reader.nextKeyValue()) { + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + result.add(new String(value.getBytes(), 0, value.getLength())); + } + } finally { + reader.close(); } - reader.close(); return result; }