Author: tucu
Date: Tue Jan 29 19:38:40 2013
New Revision: 1440076

URL: http://svn.apache.org/viewvc?rev=1440076&view=rev
Log:
MAPREDUCE-2264. Job status exceeds 100% in some cases. (devaraj.k and sandyr 
via tucu)

Added:
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1440076&r1=1440075&r2=1440076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jan 29 
19:38:40 2013
@@ -275,6 +275,9 @@ Release 2.0.3-alpha - Unreleased 
     MAPREDUCE-4803. Remove duplicate copy of TestIndexCache. (Mariappan Asokan
     via sseth)
 
+    MAPREDUCE-2264. Job status exceeds 100% in some cases. 
+    (devaraj.k and sandyr via tucu)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1440076&r1=1440075&r2=1440076&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
 Tue Jan 29 19:38:40 2013
@@ -218,6 +218,7 @@ public class Merger {  
     CompressionCodec codec = null;
     long segmentOffset = 0;
     long segmentLength = -1;
+    long rawDataLength = -1;
     
     Counters.Counter mapOutputsCounter = null;
 
@@ -234,6 +235,15 @@ public class Merger {  
       this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, 
preserve, 
            mergedMapOutputsCounter);
     }
+    
+    public Segment(Configuration conf, FileSystem fs, Path file,
+        CompressionCodec codec, boolean preserve,
+        Counters.Counter mergedMapOutputsCounter, long rawDataLength)
+            throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, 
preserve, 
+          mergedMapOutputsCounter);
+      this.rawDataLength = rawDataLength;
+    }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
                    long segmentOffset, long segmentLength,
@@ -261,6 +271,11 @@ public class Merger {  
     public Segment(Reader<K, V> reader, boolean preserve) {
       this(reader, preserve, null);
     }
+
+    public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
+      this(reader, preserve, null);
+      this.rawDataLength = rawDataLength;
+    }
     
     public Segment(Reader<K, V> reader, boolean preserve, 
                    Counters.Counter mapOutputsCounter) {
@@ -300,6 +315,10 @@ public class Merger {  
         segmentLength : reader.getLength();
     }
     
+    public long getRawDataLength() {
+      return (rawDataLength > 0) ? rawDataLength : getLength();
+    }
+
     boolean nextRawKey() throws IOException {
       return reader.nextRawKey(key);
     }
@@ -633,7 +652,7 @@ public class Merger {  
             totalBytesProcessed = 0;
             totalBytes = 0;
             for (int i = 0; i < segmentsToMerge.size(); i++) {
-              totalBytes += segmentsToMerge.get(i).getLength();
+              totalBytes += segmentsToMerge.get(i).getRawDataLength();
             }
           }
           if (totalBytes != 0) //being paranoid
@@ -702,7 +721,7 @@ public class Merger {  
           // size will match(almost) if combiner is not called in merge.
           long inputBytesOfThisMerge = totalBytesProcessed -
                                        bytesProcessedInPrevMerges;
-          totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+          totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
           if (totalBytes != 0) {
             progPerByte = 1.0f / (float)totalBytes;
           }
@@ -768,7 +787,7 @@ public class Merger {  
       for (int i = 0; i < numSegments; i++) {
         // Not handling empty segments here assuming that it would not affect
         // much in calculation of mergeProgress.
-        segmentSizes.add(segments.get(i).getLength());
+        segmentSizes.add(segments.get(i).getRawDataLength());
       }
       
       // If includeFinalMerge is true, allow the following while loop iterate

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1440076&r1=1440075&r2=1440076&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
 Tue Jan 29 19:38:40 2013
@@ -89,7 +89,7 @@ public class MergeManagerImpl<K, V> impl
     new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
   private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
   
-  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+  Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
   private final OnDiskMerger onDiskMerger;
   
   private final long memoryLimit;
@@ -336,7 +336,7 @@ public class MergeManagerImpl<K, V> impl
              inMemoryMergedMapOutputs.size());
   }
   
-  public synchronized void closeOnDiskFile(Path file) {
+  public synchronized void closeOnDiskFile(CompressAwarePath file) {
     onDiskMapOutputs.add(file);
     
     if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
@@ -356,7 +356,7 @@ public class MergeManagerImpl<K, V> impl
     List<InMemoryMapOutput<K, V>> memory = 
       new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
     memory.addAll(inMemoryMapOutputs);
-    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
+    List<CompressAwarePath> disk = new 
ArrayList<CompressAwarePath>(onDiskMapOutputs);
     return finalMerge(jobConf, rfs, memory, disk);
   }
    
@@ -456,6 +456,7 @@ public class MergeManagerImpl<K, V> impl
                         codec, null);
 
       RawKeyValueIterator rIter = null;
+      CompressAwarePath compressAwarePath;
       try {
         LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
                  " segments...");
@@ -474,6 +475,8 @@ public class MergeManagerImpl<K, V> impl
           combineCollector.setWriter(writer);
           combineAndSpill(rIter, reduceCombineInputCounter);
         }
+        compressAwarePath = new CompressAwarePath(outputPath,
+            writer.getRawLength());
         writer.close();
 
         LOG.info(reduceId +  
@@ -489,12 +492,12 @@ public class MergeManagerImpl<K, V> impl
       }
 
       // Note the output of the merge
-      closeOnDiskFile(outputPath);
+      closeOnDiskFile(compressAwarePath);
     }
 
   }
   
-  private class OnDiskMerger extends MergeThread<Path,K,V> {
+  private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
     
     public OnDiskMerger(MergeManagerImpl<K, V> manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
@@ -503,7 +506,7 @@ public class MergeManagerImpl<K, V> impl
     }
     
     @Override
-    public void merge(List<Path> inputs) throws IOException {
+    public void merge(List<CompressAwarePath> inputs) throws IOException {
       // sanity check
       if (inputs == null || inputs.isEmpty()) {
         LOG.info("No ondisk files to merge...");
@@ -518,7 +521,7 @@ public class MergeManagerImpl<K, V> impl
                " map outputs on disk. Triggering merge...");
       
       // 1. Prepare the list of files to be merged. 
-      for (Path file : inputs) {
+      for (CompressAwarePath file : inputs) {
         approxOutputSize += localFS.getFileStatus(file).getLen();
       }
 
@@ -536,6 +539,7 @@ public class MergeManagerImpl<K, V> impl
                         (Class<V>) jobConf.getMapOutputValueClass(),
                         codec, null);
       RawKeyValueIterator iter  = null;
+      CompressAwarePath compressAwarePath;
       Path tmpDir = new Path(reduceId.toString());
       try {
         iter = Merger.merge(jobConf, rfs,
@@ -548,13 +552,15 @@ public class MergeManagerImpl<K, V> impl
                             mergedMapOutputsCounter, null);
 
         Merger.writeFile(iter, writer, reporter, jobConf);
+        compressAwarePath = new CompressAwarePath(outputPath,
+            writer.getRawLength());
         writer.close();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
       }
 
-      closeOnDiskFile(outputPath);
+      closeOnDiskFile(compressAwarePath);
 
       LOG.info(reduceId +
           " Finished merging " + inputs.size() + 
@@ -653,7 +659,7 @@ public class MergeManagerImpl<K, V> impl
 
   private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
                                        List<InMemoryMapOutput<K,V>> 
inMemoryMapOutputs,
-                                       List<Path> onDiskMapOutputs
+                                       List<CompressAwarePath> onDiskMapOutputs
                                        ) throws IOException {
     LOG.info("finalMerge called with " + 
              inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
@@ -712,7 +718,8 @@ public class MergeManagerImpl<K, V> impl
         try {
           Merger.writeFile(rIter, writer, reporter, job);
           // add to list of final disk outputs.
-          onDiskMapOutputs.add(outputPath);
+          onDiskMapOutputs.add(new CompressAwarePath(outputPath,
+              writer.getRawLength()));
         } catch (IOException e) {
           if (null != outputPath) {
             try {
@@ -742,15 +749,19 @@ public class MergeManagerImpl<K, V> impl
     // segments on disk
     List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
     long onDiskBytes = inMemToDiskBytes;
-    Path[] onDisk = onDiskMapOutputs.toArray(new 
Path[onDiskMapOutputs.size()]);
-    for (Path file : onDisk) {
-      onDiskBytes += fs.getFileStatus(file).getLen();
-      LOG.debug("Disk file: " + file + " Length is " + 
-          fs.getFileStatus(file).getLen());
+    long rawBytes = inMemToDiskBytes;
+    CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
+        new CompressAwarePath[onDiskMapOutputs.size()]);
+    for (CompressAwarePath file : onDisk) {
+      long fileLength = fs.getFileStatus(file).getLen();
+      onDiskBytes += fileLength;
+      rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : 
fileLength;
+
+      LOG.debug("Disk file: " + file + " Length is " + fileLength);
       diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
                                          (file.toString().endsWith(
                                              Task.MERGED_OUTPUT_PREFIX) ?
-                                          null : mergedMapOutputsCounter)
+                                          null : mergedMapOutputsCounter), 
file.getRawDataLength()
                                         ));
     }
     LOG.info("Merging " + onDisk.length + " files, " +
@@ -786,7 +797,7 @@ public class MergeManagerImpl<K, V> impl
         return diskMerge;
       }
       finalSegments.add(new Segment<K,V>(
-            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+            new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
     }
     return Merger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
@@ -794,4 +805,27 @@ public class MergeManagerImpl<K, V> impl
                  null);
   
   }
+
+  static class CompressAwarePath extends Path {
+    private long rawDataLength;
+
+    public CompressAwarePath(Path path, long rawDataLength) {
+      super(path.toUri());
+      this.rawDataLength = rawDataLength;
+    }
+
+    public long getRawDataLength() {
+      return rawDataLength;
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+      return super.equals(other);
+    }
+    
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+  }
 }

Modified: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1440076&r1=1440075&r2=1440076&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
 (original)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
 Tue Jan 29 19:38:40 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapred.MapOutputFile;
 
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import 
org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -112,7 +113,9 @@ class OnDiskMapOutput<K, V> extends MapO
   @Override
   public void commit() throws IOException {
     localFS.rename(tmpOutputPath, outputPath);
-    merger.closeOnDiskFile(outputPath);
+    CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
+        getSize());
+    merger.closeOnDiskFile(compressAwarePath);
   }
   
   @Override

Added: 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java?rev=1440076&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
 (added)
+++ 
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
 Tue Jan 29 19:38:40 2013
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doAnswer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestMerger {
+
+  private Configuration conf;
+  private JobConf jobConf;
+  private FileSystem fs;
+  
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+    jobConf = new JobConf();
+    fs = FileSystem.getLocal(conf);
+  }
+  
+  @After
+  public void cleanup() throws IOException {    
+    fs.delete(new Path(jobConf.getLocalDirs()[0]), true);
+  }
+  
+  @Test
+  public void testInMemoryMerger() throws IOException {
+    JobID jobId = new JobID("a", 0);
+    TaskAttemptID reduceId = new TaskAttemptID(
+        new TaskID(jobId, TaskType.REDUCE, 0), 0);
+    TaskAttemptID mapId1 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.MAP, 1), 0);
+    TaskAttemptID mapId2 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.MAP, 2), 0);
+    
+    LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    
+    MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, 
Text>(
+        reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, 
null,
+        null, null, new Progress(), new MROutputFiles());
+    
+    // write map outputs
+    Map<String, String> map1 = new TreeMap<String, String>();
+    map1.put("apple", "disgusting");
+    map1.put("carrot", "delicious");
+    Map<String, String> map2 = new TreeMap<String, String>();
+    map1.put("banana", "pretty good");
+    byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
+    byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
+    InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, 
Text>(
+        conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
+    InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, 
Text>(
+        conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
+    System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
+        mapOutputBytes1.length);
+    System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
+        mapOutputBytes2.length);
+    
+    // create merger and run merge
+    MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
+        mergeManager.createInMemoryMerger();
+    List<InMemoryMapOutput<Text, Text>> mapOutputs =
+        new ArrayList<InMemoryMapOutput<Text, Text>>();
+    mapOutputs.add(mapOutput1);
+    mapOutputs.add(mapOutput2);
+    
+    inMemoryMerger.merge(mapOutputs);
+    
+    Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+    Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
+    
+    List<String> keys = new ArrayList<String>();
+    List<String> values = new ArrayList<String>();
+    readOnDiskMapOutput(conf, fs, outPath, keys, values);
+    Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
+    Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", 
"delicious"));
+  }
+  
+  private byte[] writeMapOutput(Configuration conf, Map<String, String> 
keysToValues)
+      throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
+        Text.class, Text.class, null, null);
+    for (String key : keysToValues.keySet()) {
+      String value = keysToValues.get(key);
+      writer.append(new Text(key), new Text(value));
+    }
+    writer.close();
+    return baos.toByteArray();
+  }
+  
+  private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path 
path,
+      List<String> keys, List<String> values) throws IOException {
+    IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
+        path, null, null);
+    DataInputBuffer keyBuff = new DataInputBuffer();
+    DataInputBuffer valueBuff = new DataInputBuffer();
+    Text key = new Text();
+    Text value = new Text();
+    while (reader.nextRawKey(keyBuff)) {
+      key.readFields(keyBuff);
+      keys.add(key.toString());
+      reader.nextRawValue(valueBuff);
+      value.readFields(valueBuff);
+      values.add(value.toString());
+    }
+  }
+  
+  @Test
+  public void testCompressed() throws IOException {
+    testMergeShouldReturnProperProgress(getCompressedSegments());
+  }
+  
+  @Test
+  public void testUncompressed() throws IOException {
+    testMergeShouldReturnProperProgress(getUncompressedSegments());
+  }
+  
+  @SuppressWarnings( { "deprecation", "unchecked" })
+  public void testMergeShouldReturnProperProgress(
+      List<Segment<Text, Text>> segments) throws IOException {
+    Path tmpDir = new Path("localpath");
+    Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
+    Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
+    RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
+    Counter readsCounter = new Counter();
+    Counter writesCounter = new Counter();
+    Progress mergePhase = new Progress();
+    RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
+        valueClass, segments, 2, tmpDir, comparator, getReporter(),
+        readsCounter, writesCounter, mergePhase);
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
+  }
+
+  private Progressable getReporter() {
+    Progressable reporter = new Progressable() {
+      @Override
+      public void progress() {
+      }
+    };
+    return reporter;
+  }
+
+  private List<Segment<Text, Text>> getUncompressedSegments() throws 
IOException {
+    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
+    for (int i = 1; i < 1; i++) {
+      segments.add(getUncompressedSegment(i));
+    }
+    return segments;
+  }
+  
+  private List<Segment<Text, Text>> getCompressedSegments() throws IOException 
{
+    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
+    for (int i = 1; i < 1; i++) {
+      segments.add(getCompressedSegment(i));
+    }
+    return segments;
+  }
+  
+  private Segment<Text, Text> getUncompressedSegment(int i) throws IOException 
{
+    return new Segment<Text, Text>(getReader(i), false);
+  }
+
+  private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
+    return new Segment<Text, Text>(getReader(i), false, 3000l);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Reader<Text, Text> getReader(int i) throws IOException {
+    Reader<Text, Text> readerMock = mock(Reader.class);
+    when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
+        20l);
+    when(
+        readerMock.nextRawKey(any(DataInputBuffer.class)))
+        .thenAnswer(getKeyAnswer("Segment" + i));
+    doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
+        any(DataInputBuffer.class));
+
+    return readerMock;
+  }
+
+  private Answer<?> getKeyAnswer(final String segmentName) {
+    return new Answer<Object>() {
+      int i = 0;
+
+      public Boolean answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        DataInputBuffer key = (DataInputBuffer) args[0];
+        if (i++ == 2) {
+          return false;
+        }
+        key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
+        return true;
+      }
+    };
+  }
+  
+  private Answer<?> getValueAnswer(final String segmentName) {
+    return new Answer<Void>() {
+      int i = 0;
+
+      public Void answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        DataInputBuffer key = (DataInputBuffer) args[0];
+        if (i++ == 2) {
+          return null;
+        }
+        key.reset(("Segment Value " + segmentName + i).getBytes(), 20);
+        return null;
+      }
+    };
+  }
+}
\ No newline at end of file


Reply via email to