Author: szetszwo Date: Fri Jan 25 03:13:36 2013 New Revision: 1438306 URL: http://svn.apache.org/viewvc?rev=1438306&view=rev Log: Merge r1437841 through r1438305 from trunk.
Added: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java - copied unchanged from r1438305, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1437841-1438305 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1438306&r1=1438305&r2=1438306&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Fri Jan 25 03:13:36 2013 @@ -268,6 +268,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken. (Junping Du via sseth) + MAPREDUCE-2264. Job status exceeds 100% in some cases. + (devaraj.k and sandyr via tucu) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1437841-1438305 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1437841-1438305 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1438306&r1=1438305&r2=1438306&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Fri Jan 25 03:13:36 2013 @@ -218,6 +218,7 @@ public class Merger { CompressionCodec codec = null; long segmentOffset = 0; long segmentLength = -1; + long rawDataLength = -1; Counters.Counter mapOutputsCounter = null; @@ -234,6 +235,15 @@ public class Merger { this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, mergedMapOutputsCounter); } + + public Segment(Configuration conf, FileSystem fs, Path file, + CompressionCodec codec, boolean preserve, + Counters.Counter mergedMapOutputsCounter, long rawDataLength) + throws IOException { + this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, + mergedMapOutputsCounter); + this.rawDataLength = rawDataLength; + } public Segment(Configuration conf, FileSystem fs, Path file, long segmentOffset, long segmentLength, @@ -261,6 +271,11 @@ public class Merger { public Segment(Reader<K, V> reader, boolean preserve) { this(reader, preserve, null); } + + public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) { + this(reader, preserve, null); + this.rawDataLength = rawDataLength; + } public Segment(Reader<K, V> reader, boolean preserve, Counters.Counter mapOutputsCounter) { @@ -300,6 +315,10 @@ public class Merger { segmentLength : reader.getLength(); } + public long getRawDataLength() { + return (rawDataLength > 0) ? rawDataLength : getLength(); + } + boolean nextRawKey() throws IOException { return reader.nextRawKey(key); } @@ -633,7 +652,7 @@ public class Merger { totalBytesProcessed = 0; totalBytes = 0; for (int i = 0; i < segmentsToMerge.size(); i++) { - totalBytes += segmentsToMerge.get(i).getLength(); + totalBytes += segmentsToMerge.get(i).getRawDataLength(); } } if (totalBytes != 0) //being paranoid @@ -702,7 +721,7 @@ public class Merger { // size will match(almost) if combiner is not called in merge. long inputBytesOfThisMerge = totalBytesProcessed - bytesProcessedInPrevMerges; - totalBytes -= inputBytesOfThisMerge - tempSegment.getLength(); + totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength(); if (totalBytes != 0) { progPerByte = 1.0f / (float)totalBytes; } @@ -768,7 +787,7 @@ public class Merger { for (int i = 0; i < numSegments; i++) { // Not handling empty segments here assuming that it would not affect // much in calculation of mergeProgress. - segmentSizes.add(segments.get(i).getLength()); + segmentSizes.add(segments.get(i).getRawDataLength()); } // If includeFinalMerge is true, allow the following while loop iterate Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1438306&r1=1438305&r2=1438306&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Fri Jan 25 03:13:36 2013 @@ -89,7 +89,7 @@ public class MergeManagerImpl<K, V> impl new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>()); private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger; - Set<Path> onDiskMapOutputs = new TreeSet<Path>(); + Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>(); private final OnDiskMerger onDiskMerger; private final long memoryLimit; @@ -336,7 +336,7 @@ public class MergeManagerImpl<K, V> impl inMemoryMergedMapOutputs.size()); } - public synchronized void closeOnDiskFile(Path file) { + public synchronized void closeOnDiskFile(CompressAwarePath file) { onDiskMapOutputs.add(file); if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { @@ -356,7 +356,7 @@ public class MergeManagerImpl<K, V> impl List<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); memory.addAll(inMemoryMapOutputs); - List<Path> disk = new ArrayList<Path>(onDiskMapOutputs); + List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); return finalMerge(jobConf, rfs, memory, disk); } @@ -456,6 +456,7 @@ public class MergeManagerImpl<K, V> impl codec, null); RawKeyValueIterator rIter = null; + CompressAwarePath compressAwarePath; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); @@ -474,6 +475,8 @@ public class MergeManagerImpl<K, V> impl combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } + compressAwarePath = new CompressAwarePath(outputPath, + writer.getRawLength()); writer.close(); LOG.info(reduceId + @@ -489,12 +492,12 @@ public class MergeManagerImpl<K, V> impl } // Note the output of the merge - closeOnDiskFile(outputPath); + closeOnDiskFile(compressAwarePath); } } - private class OnDiskMerger extends MergeThread<Path,K,V> { + private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> { public OnDiskMerger(MergeManagerImpl<K, V> manager) { super(manager, Integer.MAX_VALUE, exceptionReporter); @@ -503,7 +506,7 @@ public class MergeManagerImpl<K, V> impl } @Override - public void merge(List<Path> inputs) throws IOException { + public void merge(List<CompressAwarePath> inputs) throws IOException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); @@ -518,8 +521,8 @@ public class MergeManagerImpl<K, V> impl " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. - for (Path file : inputs) { - approxOutputSize += localFS.getFileStatus(file).getLen(); + for (CompressAwarePath file : inputs) { + approxOutputSize += localFS.getFileStatus(file.getPath()).getLen(); } // add the checksum length @@ -536,6 +539,7 @@ public class MergeManagerImpl<K, V> impl (Class<V>) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator iter = null; + CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, @@ -548,13 +552,15 @@ public class MergeManagerImpl<K, V> impl mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); + compressAwarePath = new CompressAwarePath(outputPath, + writer.getRawLength()); writer.close(); } catch (IOException e) { localFS.delete(outputPath, true); throw e; } - closeOnDiskFile(outputPath); + closeOnDiskFile(compressAwarePath); LOG.info(reduceId + " Finished merging " + inputs.size() + @@ -653,7 +659,7 @@ public class MergeManagerImpl<K, V> impl private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, - List<Path> onDiskMapOutputs + List<CompressAwarePath> onDiskMapOutputs ) throws IOException { LOG.info("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + @@ -712,7 +718,8 @@ public class MergeManagerImpl<K, V> impl try { Merger.writeFile(rIter, writer, reporter, job); // add to list of final disk outputs. - onDiskMapOutputs.add(outputPath); + onDiskMapOutputs.add(new CompressAwarePath(outputPath, + writer.getRawLength())); } catch (IOException e) { if (null != outputPath) { try { @@ -742,15 +749,19 @@ public class MergeManagerImpl<K, V> impl // segments on disk List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>(); long onDiskBytes = inMemToDiskBytes; - Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]); - for (Path file : onDisk) { - onDiskBytes += fs.getFileStatus(file).getLen(); - LOG.debug("Disk file: " + file + " Length is " + - fs.getFileStatus(file).getLen()); - diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs, + long rawBytes = inMemToDiskBytes; + CompressAwarePath[] onDisk = onDiskMapOutputs.toArray( + new CompressAwarePath[onDiskMapOutputs.size()]); + for (CompressAwarePath file : onDisk) { + long fileLength = fs.getFileStatus(file.getPath()).getLen(); + onDiskBytes += fileLength; + rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength; + + LOG.debug("Disk file: " + file + " Length is " + fileLength); + diskSegments.add(new Segment<K, V>(job, fs, file.getPath(), codec, keepInputs, (file.toString().endsWith( Task.MERGED_OUTPUT_PREFIX) ? - null : mergedMapOutputsCounter) + null : mergedMapOutputsCounter), file.getRawDataLength() )); } LOG.info("Merging " + onDisk.length + " files, " + @@ -786,7 +797,7 @@ public class MergeManagerImpl<K, V> impl return diskMerge; } finalSegments.add(new Segment<K,V>( - new RawKVIteratorReader(diskMerge, onDiskBytes), true)); + new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes)); } return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, @@ -794,4 +805,24 @@ public class MergeManagerImpl<K, V> impl null); } + + static class CompressAwarePath + { + private long rawDataLength; + + private Path path; + + public CompressAwarePath(Path path, long rawDataLength) { + this.path = path; + this.rawDataLength = rawDataLength; + } + + public long getRawDataLength() { + return rawDataLength; + } + + public Path getPath() { + return path; + } + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1438306&r1=1438305&r2=1438306&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Fri Jan 25 03:13:36 2013 @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -112,7 +113,9 @@ class OnDiskMapOutput<K, V> extends MapO @Override public void commit() throws IOException { localFS.rename(tmpOutputPath, outputPath); - merger.closeOnDiskFile(outputPath); + CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, + getSize()); + merger.closeOnDiskFile(compressAwarePath); } @Override Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1437841-1438305