Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 850d6a3fd -> 521196874


MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong 
position/key information for uncompressed input sometimes. Contributed by 
Zhihai Xu
(cherry picked from commit 58d1a02b8d66b1d2a6ac2158be32bd35ad2e69bd)

Conflicts:

        hadoop-mapreduce-project/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52119687
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52119687
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52119687

Branch: refs/heads/branch-2.7
Commit: 521196874c7a53d356f972a8ffb8fe03d9ca14fb
Parents: 850d6a3
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Sep 18 15:10:38 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Sep 18 15:10:38 2015 +0000

----------------------------------------------------------------------
 .../java/org/apache/hadoop/util/LineReader.java |  17 +-
 hadoop-mapreduce-project/CHANGES.txt            |   4 +
 .../lib/input/UncompressedSplitLineReader.java  |  31 +---
 .../hadoop/mapred/TestLineRecordReader.java     | 138 ++++++++++++++++
 .../lib/input/TestLineRecordReader.java         | 161 +++++++++++++++++++
 5 files changed, 316 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
index 1d1b569..900215a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -303,7 +303,10 @@ public class LineReader implements Closeable {
         startPosn = bufferPosn = 0;
         bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
         if (bufferLength <= 0) {
-          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          if (ambiguousByteCount > 0) {
+            str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+            bytesConsumed += ambiguousByteCount;
+          }
           break; // EOF
         }
       }
@@ -325,13 +328,13 @@ public class LineReader implements Closeable {
       if (appendLength > maxLineLength - txtLength) {
         appendLength = maxLineLength - txtLength;
       }
+      bytesConsumed += ambiguousByteCount;
+      if (appendLength >= 0 && ambiguousByteCount > 0) {
+        //appending the ambiguous characters (refer case 2.2)
+        str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+        ambiguousByteCount = 0;
+      }
       if (appendLength > 0) {
-        if (ambiguousByteCount > 0) {
-          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
-          //appending the ambiguous characters (refer case 2.2)
-          bytesConsumed += ambiguousByteCount;
-          ambiguousByteCount=0;
-        }
         str.append(buffer, startPosn, appendLength);
         txtLength += appendLength;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 55f4f0a..cd98559 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -34,6 +34,10 @@ Release 2.7.2 - UNRELEASED
     multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
     AJISAKA via jlowe)
 
+    MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong
+    position/key information for uncompressed input sometimes. (Zhihai Xu via
+    jlowe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
index 52fb7b0..38491b0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
@@ -40,8 +40,6 @@ public class UncompressedSplitLineReader extends 
SplitLineReader {
   private long totalBytesRead = 0;
   private boolean finished = false;
   private boolean usingCRLF;
-  private int unusedBytes = 0;
-  private int lastBytesRead = 0;
 
   public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
       byte[] recordDelimiterBytes, long splitLength) throws IOException {
@@ -59,7 +57,6 @@ public class UncompressedSplitLineReader extends 
SplitLineReader {
                                 (int)(splitLength - totalBytesRead));
     }
     int bytesRead = in.read(buffer, 0, maxBytesToRead);
-    lastBytesRead = bytesRead;
 
     // If the split ended in the middle of a record delimiter then we need
     // to read one additional record, as the consumer of the next split will
@@ -83,39 +80,17 @@ public class UncompressedSplitLineReader extends 
SplitLineReader {
   @Override
   public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
       throws IOException {
-    long bytesRead = 0;
+    int bytesRead = 0;
     if (!finished) {
       // only allow at most one more record to be read after the stream
       // reports the split ended
       if (totalBytesRead > splitLength) {
         finished = true;
       }
-      bytesRead = totalBytesRead;
-      int bytesConsumed = super.readLine(str, maxLineLength, 
maxBytesToConsume);
-      bytesRead = totalBytesRead - bytesRead;
 
-      // No records left.
-      if (bytesConsumed == 0 && bytesRead == 0) {
-        return 0;
-      }
-
-      int bufferSize = getBufferSize();
-
-      // Add the remaining buffer size not used for the last call
-      // of fillBuffer method.
-      if (lastBytesRead <= 0) {
-        bytesRead += bufferSize;
-      } else if (bytesRead > 0) {
-        bytesRead += bufferSize - lastBytesRead;
-      }
-
-      // Adjust the size of the buffer not used for this record.
-      // The size is carried over for the next calculation.
-      bytesRead += unusedBytes;
-      unusedBytes = bufferSize - getBufferPosn();
-      bytesRead -= unusedBytes;
+      bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
     }
-    return (int) bytesRead;
+    return bytesRead;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
index ffba2d9..e6350c9 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -350,4 +351,141 @@ public class TestLineRecordReader {
       }
     }
   }
+
+  @Test
+  public void testUncompressedInputCustomDelimiterPosValue()
+      throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "1234567890ab12ab345";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.setInt("io.file.buffer.size", 10);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+    String delimiter = "ab";
+    byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+    LineRecordReader reader = new LineRecordReader(conf, split,
+        recordDelimiterBytes);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    reader.next(key, value);
+    // Get first record:"1234567890"
+    assertEquals(10, value.getLength());
+    // Position should be 12 right after "1234567890ab"
+    assertEquals(12, reader.getPos());
+    reader.next(key, value);
+    // Get second record:"12"
+    assertEquals(2, value.getLength());
+    // Position should be 16 right after "1234567890ab12ab"
+    assertEquals(16, reader.getPos());
+    reader.next(key, value);
+    // Get third record:"345"
+    assertEquals(3, value.getLength());
+    // Position should be 19 right after "1234567890ab12ab345"
+    assertEquals(19, reader.getPos());
+    assertFalse(reader.next(key, value));
+    assertEquals(19, reader.getPos());
+
+    split = new FileSplit(inputFile, 15, 4, (String[])null);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+    // No record is in the second split because the second split dropped
+    // the first record, which was already reported by the first split.
+    // The position should be 19 right after "1234567890ab12ab345"
+    assertEquals(19, reader.getPos());
+    assertFalse(reader.next(key, value));
+    assertEquals(19, reader.getPos());
+
+    inputData = "123456789aab";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 12, (String[])null);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+    reader.next(key, value);
+    // Get first record:"123456789a"
+    assertEquals(10, value.getLength());
+    // Position should be 12 right after "123456789aab"
+    assertEquals(12, reader.getPos());
+    assertFalse(reader.next(key, value));
+    assertEquals(12, reader.getPos());
+
+    inputData = "123456789a";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 10, (String[])null);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+    reader.next(key, value);
+    // Get first record:"123456789a"
+    assertEquals(10, value.getLength());
+    // Position should be 10 right after "123456789a"
+    assertEquals(10, reader.getPos());
+    assertFalse(reader.next(key, value));
+    assertEquals(10, reader.getPos());
+
+    inputData = "123456789ab";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 11, (String[])null);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+    reader.next(key, value);
+    // Get first record:"123456789"
+    assertEquals(9, value.getLength());
+    // Position should be 11 right after "123456789ab"
+    assertEquals(11, reader.getPos());
+    assertFalse(reader.next(key, value));
+    assertEquals(11, reader.getPos());
+  }
+
+  @Test
+  public void testUncompressedInputDefaultDelimiterPosValue()
+      throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "1234567890\r\n12\r\n345";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.setInt("io.file.buffer.size", 10);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+    FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+    LineRecordReader reader = new LineRecordReader(conf, split,
+        null);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    reader.next(key, value);
+    // Get first record:"1234567890"
+    assertEquals(10, value.getLength());
+    // Position should be 12 right after "1234567890\r\n"
+    assertEquals(12, reader.getPos());
+    reader.next(key, value);
+    // Get second record:"12"
+    assertEquals(2, value.getLength());
+    // Position should be 16 right after "1234567890\r\n12\r\n"
+    assertEquals(16, reader.getPos());
+    assertFalse(reader.next(key, value));
+
+    split = new FileSplit(inputFile, 15, 4, (String[])null);
+    reader = new LineRecordReader(conf, split, null);
+    // The second split dropped the first record "\n"
+    // The position should be 16 right after "1234567890\r\n12\r\n"
+    assertEquals(16, reader.getPos());
+    reader.next(key, value);
+    // Get third record:"345"
+    assertEquals(3, value.getLength());
+    // Position should be 19 right after "1234567890\r\n12\r\n345"
+    assertEquals(19, reader.getPos());
+    assertFalse(reader.next(key, value));
+    assertEquals(19, reader.getPos());
+
+    inputData = "123456789\r\r\n";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 12, (String[])null);
+    reader = new LineRecordReader(conf, split, null);
+    reader.next(key, value);
+    // Get first record:"123456789"
+    assertEquals(9, value.getLength());
+    // Position should be 10 right after "123456789\r"
+    assertEquals(10, reader.getPos());
+    reader.next(key, value);
+    // Get second record:""
+    assertEquals(0, value.getLength());
+    // Position should be 12 right after "123456789\r\r\n"
+    assertEquals(12, reader.getPos());
+    assertFalse(reader.next(key, value));
+    assertEquals(12, reader.getPos());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52119687/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
index 6c86739..9794d23 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.lib.input;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -37,6 +38,8 @@ import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.Decompressor;
@@ -334,4 +337,162 @@ public class TestLineRecordReader {
       }
     }
   }
+
+  @Test
+  public void testUncompressedInputCustomDelimiterPosValue()
+      throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "1234567890ab12ab345";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.setInt("io.file.buffer.size", 10);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+    String delimiter = "ab";
+    byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf,
+        new TaskAttemptID());
+    LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
+    reader.initialize(split, context);
+    LongWritable key;
+    Text value;
+    reader.nextKeyValue();
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    // Get first record:"1234567890"
+    assertEquals(10, value.getLength());
+    assertEquals(0, key.get());
+    reader.nextKeyValue();
+    // Get second record:"12"
+    assertEquals(2, value.getLength());
+    // Key should be 12 right after "1234567890ab"
+    assertEquals(12, key.get());
+    reader.nextKeyValue();
+    // Get third record:"345"
+    assertEquals(3, value.getLength());
+    // Key should be 16 right after "1234567890ab12ab"
+    assertEquals(16, key.get());
+    assertFalse(reader.nextKeyValue());
+    // Key should be 19 right after "1234567890ab12ab345"
+    assertEquals(19, key.get());
+
+    split = new FileSplit(inputFile, 15, 4, (String[])null);
+    reader = new LineRecordReader(recordDelimiterBytes);
+    reader.initialize(split, context);
+    // No record is in the second split because the second split dropped
+    // the first record, which was already reported by the first split.
+    assertFalse(reader.nextKeyValue());
+
+    inputData = "123456789aab";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 12, (String[])null);
+    reader = new LineRecordReader(recordDelimiterBytes);
+    reader.initialize(split, context);
+    reader.nextKeyValue();
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    // Get first record:"123456789a"
+    assertEquals(10, value.getLength());
+    assertEquals(0, key.get());
+    assertFalse(reader.nextKeyValue());
+    // Key should be 12 right after "123456789aab"
+    assertEquals(12, key.get());
+
+    inputData = "123456789a";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 10, (String[])null);
+    reader = new LineRecordReader(recordDelimiterBytes);
+    reader.initialize(split, context);
+    reader.nextKeyValue();
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    // Get first record:"123456789a"
+    assertEquals(10, value.getLength());
+    assertEquals(0, key.get());
+    assertFalse(reader.nextKeyValue());
+    // Key should be 10 right after "123456789a"
+    assertEquals(10, key.get());
+
+    inputData = "123456789ab";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 11, (String[])null);
+    reader = new LineRecordReader(recordDelimiterBytes);
+    reader.initialize(split, context);
+    reader.nextKeyValue();
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    // Get first record:"123456789"
+    assertEquals(9, value.getLength());
+    assertEquals(0, key.get());
+    assertFalse(reader.nextKeyValue());
+    // Key should be 11 right after "123456789ab"
+    assertEquals(11, key.get());
+  }
+
+  @Test
+  public void testUncompressedInputDefaultDelimiterPosValue()
+      throws Exception {
+    Configuration conf = new Configuration();
+    String inputData = "1234567890\r\n12\r\n345";
+    Path inputFile = createInputFile(conf, inputData);
+    conf.setInt("io.file.buffer.size", 10);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+    FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf,
+        new TaskAttemptID());
+    LineRecordReader reader = new LineRecordReader(null);
+    reader.initialize(split, context);
+    LongWritable key;
+    Text value;
+    reader.nextKeyValue();
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    // Get first record:"1234567890"
+    assertEquals(10, value.getLength());
+    assertEquals(0, key.get());
+    reader.nextKeyValue();
+    // Get second record:"12"
+    assertEquals(2, value.getLength());
+    // Key should be 12 right after "1234567890\r\n"
+    assertEquals(12, key.get());
+    assertFalse(reader.nextKeyValue());
+    // Key should be 16 right after "1234567890\r\n12\r\n"
+    assertEquals(16, key.get());
+
+    split = new FileSplit(inputFile, 15, 4, (String[])null);
+    reader = new LineRecordReader(null);
+    reader.initialize(split, context);
+    // The second split dropped the first record "\n"
+    reader.nextKeyValue();
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    // Get third record:"345"
+    assertEquals(3, value.getLength());
+    // Key should be 16 right after "1234567890\r\n12\r\n"
+    assertEquals(16, key.get());
+    assertFalse(reader.nextKeyValue());
+    // Key should be 19 right after "1234567890\r\n12\r\n345"
+    assertEquals(19, key.get());
+
+    inputData = "123456789\r\r\n";
+    inputFile = createInputFile(conf, inputData);
+    split = new FileSplit(inputFile, 0, 12, (String[])null);
+    reader = new LineRecordReader(null);
+    reader.initialize(split, context);
+    reader.nextKeyValue();
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    // Get first record:"123456789"
+    assertEquals(9, value.getLength());
+    assertEquals(0, key.get());
+    reader.nextKeyValue();
+    // Get second record:""
+    assertEquals(0, value.getLength());
+    // Key should be 10 right after "123456789\r"
+    assertEquals(10, key.get());
+    assertFalse(reader.nextKeyValue());
+    // Key should be 12 right after "123456789\r\r\n"
+    assertEquals(12, key.get());
+  }
 }

Reply via email to