Author: shv
Date: Fri Feb  4 03:39:34 2011
New Revision: 1067078

URL: http://svn.apache.org/viewvc?rev=1067078&view=rev
Log:
MAPREDUCE-2188. Merge 1067077 from trunk to branch 0.22.

Modified:
    hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
    
hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java

Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/CHANGES.txt?rev=1067078&r1=1067077&r2=1067078&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.22/CHANGES.txt Fri Feb  4 03:39:34 2011
@@ -463,6 +463,9 @@ Release 0.22.0 - Unreleased
     MAPREDUCE-2077. Resolve name clash in the deprecated 
     o.a.h.util.MemoryCalculatorPlugin (Luke Lu via shv)
 
+    MAPREDUCE-2188. The new API MultithreadedMapper doesn't initialize
+    RecordReader. (Owen O'Malley via shv)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: 
hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: 
http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=1067078&r1=1067077&r2=1067078&view=diff
==============================================================================
--- 
hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
 (original)
+++ 
hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
 Fri Feb  4 03:39:34 2011
@@ -246,6 +246,7 @@ public class MultithreadedMapper<K1, V1,
     private Mapper<K1,V1,K2,V2> mapper;
     private Context subcontext;
     private Throwable throwable;
+    private RecordReader<K1,V1> reader = new SubMapRecordReader();
 
     MapRunner(Context context) throws IOException, InterruptedException {
       mapper = ReflectionUtils.newInstance(mapClass, 
@@ -253,22 +254,20 @@ public class MultithreadedMapper<K1, V1,
       MapContext<K1, V1, K2, V2> mapContext = 
         new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), 
                                            outer.getTaskAttemptID(),
-                                           new SubMapRecordReader(),
+                                           reader,
                                            new SubMapRecordWriter(), 
                                            context.getOutputCommitter(),
                                            new SubMapStatusReporter(),
                                            outer.getInputSplit());
       subcontext = new WrappedMapper<K1, V1, K2, 
V2>().getMapContext(mapContext);
-    }
-
-    public Throwable getThrowable() {
-      return throwable;
+      reader.initialize(context.getInputSplit(), context);
     }
 
     @Override
     public void run() {
       try {
         mapper.run(subcontext);
+        reader.close();
       } catch (Throwable ie) {
         throwable = ie;
       }


Reply via email to