Author: acmurthy Date: Fri Dec 21 07:38:49 2012 New Revision: 1424816 URL: http://svn.apache.org/viewvc?rev=1424816&view=rev Log: MAPREDUCE-4888. Fixed NLineInputFormat one-off error which dropped data. Contributed by Vinod K V.
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1424816&r1=1424815&r2=1424816&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Fri Dec 21 07:38:49 2012 @@ -411,6 +411,9 @@ Release 1.1.2 - Unreleased MAPREDUCE-4859. Fixed TestRecoveryManager. (acmurthy) + MAPREDUCE-4888. Fixed NLineInputFormat one-off error which dropped data. + (vinodkv via acmurthy) + Release 1.1.1 - 2012.11.18 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=1424816&r1=1424815&r2=1424816&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java Fri Dec 21 07:38:49 2012 @@ -97,25 +97,14 @@ public class NLineInputFormat extends Fi numLines++; length += num; if (numLines == N) { - // NLineInputFormat uses LineRecordReader, which always reads (and - // consumes) at least one character out of its upper split - // boundary. So to make sure that each mapper gets N lines, we - // move back the upper split limits of each split by one character - // here. - if (begin == 0) { - splits.add(new FileSplit(fileName, begin, length - 1, - new String[] {})); - } else { - splits.add(new FileSplit(fileName, begin - 1, length, - new String[] {})); - } + splits.add(createFileSplit(fileName, begin, length)); begin += length; length = 0; numLines = 0; } } if (numLines != 0) { - splits.add(new FileSplit(fileName, begin, length, new String[]{})); + splits.add(createFileSplit(fileName, begin, length)); } } finally { @@ -127,6 +116,23 @@ public class NLineInputFormat extends Fi return splits.toArray(new FileSplit[splits.size()]); } + /** + * NLineInputFormat uses LineRecordReader, which always reads + * (and consumes) at least one character out of its upper split + * boundary. So to make sure that each mapper gets N lines, we + * move back the upper split limits of each split + * by one character here. + * @param fileName Path of file + * @param begin the position of the first byte in the file to process + * @param length number of bytes in InputSplit + * @return FileSplit + */ + protected static FileSplit createFileSplit(Path fileName, long begin, long length) { + return (begin == 0) + ? new FileSplit(fileName, begin, length - 1, new String[] {}) + : new FileSplit(fileName, begin - 1, length, new String[] {}); + } + public void configure(JobConf conf) { N = conf.getInt("mapred.line.input.format.linespermap", 1); } Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java?rev=1424816&r1=1424815&r2=1424816&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java Fri Dec 21 07:38:49 2012 @@ -48,9 +48,6 @@ public class TestLineInputFormat extends JobConf job = new JobConf(); Path file = new Path(workDir, "test.txt"); - int seed = new Random().nextInt(); - Random random = new Random(seed); - localFs.delete(workDir, true); FileInputFormat.setInputPaths(job, workDir); int numLinesPerMap = 5; @@ -58,7 +55,8 @@ public class TestLineInputFormat extends // for a variety of lengths for (int length = 0; length < MAX_LENGTH; - length += random.nextInt(MAX_LENGTH/10) + 1) { + length += 1) { + System.out.println("Processing file of length "+length); // create a file with length entries Writer writer = new OutputStreamWriter(localFs.create(file)); try { @@ -69,14 +67,21 @@ public class TestLineInputFormat extends } finally { writer.close(); } - checkFormat(job, numLinesPerMap); + int lastN = 0; + if (length != 0) { + lastN = length % numLinesPerMap; + if (lastN == 0) { + lastN = numLinesPerMap; + } + } + checkFormat(job, numLinesPerMap, lastN); } } // A reporter that does nothing private static final Reporter voidReporter = Reporter.NULL; - void checkFormat(JobConf job, int expectedN) throws IOException{ + void checkFormat(JobConf job, int expectedN, int lastN) throws IOException{ NLineInputFormat format = new NLineInputFormat(); format.configure(job); int ignoredNumSplits = 1; @@ -84,7 +89,8 @@ public class TestLineInputFormat extends // check all splits except last one int count = 0; - for (int j = 0; j < splits.length -1; j++) { + for (int j = 0; j < splits.length; j++) { + System.out.println("Processing split "+splits[j]); assertEquals("There are no split locations", 0, splits[j].getLocations().length); RecordReader<LongWritable, Text> reader = @@ -102,16 +108,22 @@ public class TestLineInputFormat extends try { count = 0; while (reader.next(key, value)) { + System.out.println("Got "+key+" "+value+" at count "+count+" of split "+j); count++; } } finally { reader.close(); } - assertEquals("number of lines in split is " + expectedN , - expectedN, count); + if ( j == splits.length - 1) { + assertEquals("number of lines in split(" + j + ") is wrong" , + lastN, count); + } else { + assertEquals("number of lines in split(" + j + ") is wrong" , + expectedN, count); + } } } - + public static void main(String[] args) throws Exception { new TestLineInputFormat().testFormat(); }