Author: tomwhite Date: Wed Feb 27 10:40:00 2013 New Revision: 1450723 URL: http://svn.apache.org/r1450723 Log: MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER. Contributed by Sandy Ryza.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1450723&r1=1450722&r2=1450723&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Feb 27 10:40:00 2013 @@ -189,6 +189,9 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-4951. Container preemption interpreted as task failure. (Sandy Ryza via tomwhite) + MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER. + (Sandy Ryza via tomwhite) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1450723&r1=1450722&r2=1450723&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Wed Feb 27 10:40:00 2013 @@ -475,9 +475,9 @@ public class MergeManagerImpl<K, V> impl combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } + writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength()); - writer.close(); LOG.info(reduceId + " Merge of the " + noInMemorySegments + @@ -552,9 +552,9 @@ public class MergeManagerImpl<K, V> impl mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); + writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength()); - writer.close(); } catch (IOException e) { localFS.delete(outputPath, true); throw e; @@ -713,13 +713,15 @@ public class MergeManagerImpl<K, V> impl keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); - final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, + Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, keyClass, valueClass, codec, null); try { Merger.writeFile(rIter, writer, reporter, job); - // add to list of final disk outputs. + writer.close(); onDiskMapOutputs.add(new CompressAwarePath(outputPath, writer.getRawLength())); + writer = null; + // add to list of final disk outputs. } catch (IOException e) { if (null != outputPath) { try {