Repository: hadoop Updated Branches: refs/heads/branch-2.7 850d6a3fd -> 521196874
MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong position/key information for uncompressed input sometimes. Contributed by Zhihai Xu (cherry picked from commit 58d1a02b8d66b1d2a6ac2158be32bd35ad2e69bd) Conflicts: hadoop-mapreduce-project/CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52119687 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52119687 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52119687 Branch: refs/heads/branch-2.7 Commit: 521196874c7a53d356f972a8ffb8fe03d9ca14fb Parents: 850d6a3 Author: Jason Lowe <jl...@apache.org> Authored: Fri Sep 18 15:10:38 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Fri Sep 18 15:10:38 2015 +0000 ---------------------------------------------------------------------- .../java/org/apache/hadoop/util/LineReader.java | 17 +- hadoop-mapreduce-project/CHANGES.txt | 4 + .../lib/input/UncompressedSplitLineReader.java | 31 +--- .../hadoop/mapred/TestLineRecordReader.java | 138 ++++++++++++++++ .../lib/input/TestLineRecordReader.java | 161 +++++++++++++++++++ 5 files changed, 316 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 1d1b569..900215a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -303,7 +303,10 @@ public class LineReader implements Closeable { startPosn = bufferPosn = 0; bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); if (bufferLength <= 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); + if (ambiguousByteCount > 0) { + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + bytesConsumed += ambiguousByteCount; + } break; // EOF } } @@ -325,13 +328,13 @@ public class LineReader implements Closeable { if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } + bytesConsumed += ambiguousByteCount; + if (appendLength >= 0 && ambiguousByteCount > 0) { + //appending the ambiguous characters (refer case 2.2) + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + ambiguousByteCount = 0; + } if (appendLength > 0) { - if (ambiguousByteCount > 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); - //appending the ambiguous characters (refer case 2.2) - bytesConsumed += ambiguousByteCount; - ambiguousByteCount=0; - } str.append(buffer, startPosn, appendLength); txtLength += appendLength; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 55f4f0a..cd98559 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -34,6 +34,10 @@ Release 2.7.2 - UNRELEASED multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira AJISAKA via jlowe) + MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong + position/key information for uncompressed input sometimes. (Zhihai Xu via + jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java index 52fb7b0..38491b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -40,8 +40,6 @@ public class UncompressedSplitLineReader extends SplitLineReader { private long totalBytesRead = 0; private boolean finished = false; private boolean usingCRLF; - private int unusedBytes = 0; - private int lastBytesRead = 0; public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf, byte[] recordDelimiterBytes, long splitLength) throws IOException { @@ -59,7 +57,6 @@ public class UncompressedSplitLineReader extends SplitLineReader { (int)(splitLength - totalBytesRead)); } int bytesRead = in.read(buffer, 0, maxBytesToRead); - lastBytesRead = bytesRead; // If the split ended in the middle of a record delimiter then we need // to read one additional record, as the consumer of the next split will @@ -83,39 +80,17 @@ public class UncompressedSplitLineReader extends SplitLineReader { @Override public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { - long bytesRead = 0; + int bytesRead = 0; if (!finished) { // only allow at most one more record to be read after the stream // reports the split ended if (totalBytesRead > splitLength) { finished = true; } - bytesRead = totalBytesRead; - int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume); - bytesRead = totalBytesRead - bytesRead; - // No records left. - if (bytesConsumed == 0 && bytesRead == 0) { - return 0; - } - - int bufferSize = getBufferSize(); - - // Add the remaining buffer size not used for the last call - // of fillBuffer method. - if (lastBytesRead <= 0) { - bytesRead += bufferSize; - } else if (bytesRead > 0) { - bytesRead += bufferSize - lastBytesRead; - } - - // Adjust the size of the buffer not used for this record. - // The size is carried over for the next calculation. - bytesRead += unusedBytes; - unusedBytes = bufferSize - getBufferPosn(); - bytesRead -= unusedBytes; + bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); } - return (int) bytesRead; + return bytesRead; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index ffba2d9..e6350c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -350,4 +351,141 @@ public class TestLineRecordReader { } } } + + @Test + public void testUncompressedInputCustomDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890ab12ab345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + String delimiter = "ab"; + byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + recordDelimiterBytes); + LongWritable key = new LongWritable(); + Text value = new Text(); + reader.next(key, value); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + // Position should be 12 right after "1234567890ab" + assertEquals(12, reader.getPos()); + reader.next(key, value); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Position should be 16 right after "1234567890ab12ab" + assertEquals(16, reader.getPos()); + reader.next(key, value); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Position should be 19 right after "1234567890ab12ab345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + // No record is in the second split because the second split dropped + // the first record, which was already reported by the first split. + // The position should be 19 right after "1234567890ab12ab345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + inputData = "123456789aab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + // Position should be 12 right after "123456789aab" + assertEquals(12, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(12, reader.getPos()); + + inputData = "123456789a"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 10, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + // Position should be 10 right after "123456789a" + assertEquals(10, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(10, reader.getPos()); + + inputData = "123456789ab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 11, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + // Position should be 11 right after "123456789ab" + assertEquals(11, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(11, reader.getPos()); + } + + @Test + public void testUncompressedInputDefaultDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890\r\n12\r\n345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + null); + LongWritable key = new LongWritable(); + Text value = new Text(); + reader.next(key, value); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + // Position should be 12 right after "1234567890\r\n" + assertEquals(12, reader.getPos()); + reader.next(key, value); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Position should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, reader.getPos()); + assertFalse(reader.next(key, value)); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(conf, split, null); + // The second split dropped the first record "\n" + // The position should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, reader.getPos()); + reader.next(key, value); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Position should be 19 right after "1234567890\r\n12\r\n345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + inputData = "123456789\r\r\n"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(conf, split, null); + reader.next(key, value); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + // Position should be 10 right after "123456789\r" + assertEquals(10, reader.getPos()); + reader.next(key, value); + // Get second record:"" + assertEquals(0, value.getLength()); + // Position should be 12 right after "123456789\r\r\n" + assertEquals(12, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(12, reader.getPos()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index 6c86739..9794d23 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.lib.input; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -37,6 +38,8 @@ import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.Decompressor; @@ -334,4 +337,162 @@ public class TestLineRecordReader { } } } + + @Test + public void testUncompressedInputCustomDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890ab12ab345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + String delimiter = "ab"; + byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, + new TaskAttemptID()); + LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + LongWritable key; + Text value; + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Key should be 12 right after "1234567890ab" + assertEquals(12, key.get()); + reader.nextKeyValue(); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Key should be 16 right after "1234567890ab12ab" + assertEquals(16, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 19 right after "1234567890ab12ab345" + assertEquals(19, key.get()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + // No record is in the second split because the second split dropped + // the first record, which was already reported by the first split. + assertFalse(reader.nextKeyValue()); + + inputData = "123456789aab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 12 right after "123456789aab" + assertEquals(12, key.get()); + + inputData = "123456789a"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 10, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 10 right after "123456789a" + assertEquals(10, key.get()); + + inputData = "123456789ab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 11, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 11 right after "123456789ab" + assertEquals(11, key.get()); + } + + @Test + public void testUncompressedInputDefaultDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890\r\n12\r\n345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, + new TaskAttemptID()); + LineRecordReader reader = new LineRecordReader(null); + reader.initialize(split, context); + LongWritable key; + Text value; + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Key should be 12 right after "1234567890\r\n" + assertEquals(12, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, key.get()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(null); + reader.initialize(split, context); + // The second split dropped the first record "\n" + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Key should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 19 right after "1234567890\r\n12\r\n345" + assertEquals(19, key.get()); + + inputData = "123456789\r\r\n"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(null); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"" + assertEquals(0, value.getLength()); + // Key should be 10 right after "123456789\r" + assertEquals(10, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 12 right after "123456789\r\r\n" + assertEquals(12, key.get()); + } }