Author: shv Date: Fri Feb 4 03:39:34 2011 New Revision: 1067078 URL: http://svn.apache.org/viewvc?rev=1067078&view=rev Log: MAPREDUCE-2188. Merge 1067077 from trunk to branch 0.22.
Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/CHANGES.txt?rev=1067078&r1=1067077&r2=1067078&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.22/CHANGES.txt (original) +++ hadoop/mapreduce/branches/branch-0.22/CHANGES.txt Fri Feb 4 03:39:34 2011 @@ -463,6 +463,9 @@ Release 0.22.0 - Unreleased MAPREDUCE-2077. Resolve name clash in the deprecated o.a.h.util.MemoryCalculatorPlugin (Luke Lu via shv) + MAPREDUCE-2188. The new API MultithreadedMapper doesn't initialize + RecordReader. (Owen O'Malley via shv) + Release 0.21.1 - Unreleased NEW FEATURES Modified: hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=1067078&r1=1067077&r2=1067078&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original) +++ hadoop/mapreduce/branches/branch-0.22/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Fri Feb 4 03:39:34 2011 @@ -246,6 +246,7 @@ public class MultithreadedMapper<K1, V1, private Mapper<K1,V1,K2,V2> mapper; private Context subcontext; private Throwable throwable; + private RecordReader<K1,V1> reader = new SubMapRecordReader(); MapRunner(Context context) throws IOException, InterruptedException { mapper = ReflectionUtils.newInstance(mapClass, @@ -253,22 +254,20 @@ public class MultithreadedMapper<K1, V1, MapContext<K1, V1, K2, V2> mapContext = new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), outer.getTaskAttemptID(), - new SubMapRecordReader(), + reader, new SubMapRecordWriter(), context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit()); subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext); - } - - public Throwable getThrowable() { - return throwable; + reader.initialize(context.getInputSplit(), context); } @Override public void run() { try { mapper.run(subcontext); + reader.close(); } catch (Throwable ie) { throwable = ie; }