Author: shv Date: Fri Feb 4 03:38:00 2011 New Revision: 1067077 URL: http://svn.apache.org/viewvc?rev=1067077&view=rev Log: MAPREDUCE-2188. The new API MultithreadedMapper doesn't initialize RecordReader. Contributed by Owen O'Malley.
Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1067077&r1=1067076&r2=1067077&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Feb 4 03:38:00 2011 @@ -535,6 +535,9 @@ Release 0.22.0 - Unreleased MAPREDUCE-2077. Resolve name clash in the deprecated o.a.h.util.MemoryCalculatorPlugin (Luke Lu via shv) + MAPREDUCE-2188. The new API MultithreadedMapper doesn't initialize + RecordReader. (Owen O'Malley via shv) + Release 0.21.1 - Unreleased NEW FEATURES Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=1067077&r1=1067076&r2=1067077&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Fri Feb 4 03:38:00 2011 @@ -246,6 +246,7 @@ public class MultithreadedMapper<K1, V1, private Mapper<K1,V1,K2,V2> mapper; private Context subcontext; private Throwable throwable; + private RecordReader<K1,V1> reader = new SubMapRecordReader(); MapRunner(Context context) throws IOException, InterruptedException { mapper = ReflectionUtils.newInstance(mapClass, @@ -253,22 +254,20 @@ public class MultithreadedMapper<K1, V1, MapContext<K1, V1, K2, V2> mapContext = new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), outer.getTaskAttemptID(), - new SubMapRecordReader(), + reader, new SubMapRecordWriter(), context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit()); subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext); - } - - public Throwable getThrowable() { - return throwable; + reader.initialize(context.getInputSplit(), context); } @Override public void run() { try { mapper.run(subcontext); + reader.close(); } catch (Throwable ie) { throwable = ie; }