Repository: hadoop
Updated Branches:
  refs/heads/trunk e556c35b0 -> 7fd00b3db


MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause duplicate 
records (wilfreds via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7fd00b3d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7fd00b3d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7fd00b3d

Branch: refs/heads/trunk
Commit: 7fd00b3db4b7d73afd41276ba9a06ec06a0e1762
Parents: e556c35
Author: Robert Kanter <rkan...@apache.org>
Authored: Wed Nov 25 17:03:38 2015 -0800
Committer: Robert Kanter <rkan...@apache.org>
Committed: Wed Nov 25 17:03:38 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/util/LineReader.java |   9 +
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../lib/input/UncompressedSplitLineReader.java  |   5 +
 .../hadoop/mapred/TestLineRecordReader.java     | 230 +++++++++++++-----
 .../lib/input/TestLineRecordReader.java         | 237 ++++++++++++++-----
 5 files changed, 361 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
index 900215a..153953d 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -333,6 +333,10 @@ public class LineReader implements Closeable {
         //appending the ambiguous characters (refer case 2.2)
         str.append(recordDelimiterBytes, 0, ambiguousByteCount);
         ambiguousByteCount = 0;
+        // since it is now certain that the split did not split a delimiter we
+        // should not read the next record: clear the flag otherwise duplicate
+        // records could be generated
+        unsetNeedAdditionalRecordAfterSplit();
       }
       if (appendLength > 0) {
         str.append(buffer, startPosn, appendLength);
@@ -380,4 +384,9 @@ public class LineReader implements Closeable {
   protected int getBufferSize() {
     return bufferSize;
   }
+
+  protected void unsetNeedAdditionalRecordAfterSplit() {
+    // needed for custom multi byte line delimiters only
+    // see MAPREDUCE-6549 for details
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index c6e80e7..503e687 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -650,6 +650,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6557. Tests in mapreduce-client-app are writing outside of
     target. (Akira AJISAKA via junping_du)
 
+    MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause
+    duplicate records (wilfreds via rkanter)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
index 38491b0..6d495ef 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java
@@ -97,4 +97,9 @@ public class UncompressedSplitLineReader extends 
SplitLineReader {
   public boolean needAdditionalRecordAfterSplit() {
     return !finished && needAdditionalRecord;
   }
+
+  @Override
+  protected void unsetNeedAdditionalRecordAfterSplit() {
+    needAdditionalRecord = false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
index d33a614..f0cf9f5 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
@@ -334,12 +334,72 @@ public class TestLineRecordReader {
   @Test
   public void testUncompressedInput() throws Exception {
     Configuration conf = new Configuration();
-    String inputData = "abc+++def+++ghi+++"
-        + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+    // single char delimiter, best case
+    String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
     Path inputFile = createInputFile(conf, inputData);
-    conf.set("textinputformat.record.delimiter", "+++");
-    for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
-      for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+    conf.set("textinputformat.record.delimiter", "+");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter, best case
+    inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "|+|");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // single char delimiter with empty records
+    inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with empty records
+    inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "|+|");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with starting part of the delimiter in the data
+    inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+-");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with newline as start of the delimiter
+    inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "\n+");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with newline in delimiter and in data
+    inputData = "abc\ndef+\nghi+\njkl\nmno";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+\n");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
         conf.setInt("io.file.buffer.size", bufferSize);
         testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
       }
@@ -363,80 +423,126 @@ public class TestLineRecordReader {
   public void testUncompressedInputCustomDelimiterPosValue()
       throws Exception {
     Configuration conf = new Configuration();
-    String inputData = "1234567890ab12ab345";
-    Path inputFile = createInputFile(conf, inputData);
     conf.setInt("io.file.buffer.size", 10);
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
         LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-    String delimiter = "ab";
+    String inputData = "abcdefghij++kl++mno";
+    Path inputFile = createInputFile(conf, inputData);
+    String delimiter = "++";
     byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
-    FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+    // the first split must contain two records to make sure that it also pulls
+    // in the record from the 2nd split
+    int splitLength = 15;
+    FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[]) 
null);
     LineRecordReader reader = new LineRecordReader(conf, split,
         recordDelimiterBytes);
     LongWritable key = new LongWritable();
     Text value = new Text();
-    reader.next(key, value);
-    // Get first record:"1234567890"
-    assertEquals(10, value.getLength());
-    // Position should be 12 right after "1234567890ab"
-    assertEquals(12, reader.getPos());
-    reader.next(key, value);
-    // Get second record:"12"
-    assertEquals(2, value.getLength());
-    // Position should be 16 right after "1234567890ab12ab"
-    assertEquals(16, reader.getPos());
-    reader.next(key, value);
-    // Get third record:"345"
-    assertEquals(3, value.getLength());
-    // Position should be 19 right after "1234567890ab12ab345"
-    assertEquals(19, reader.getPos());
+    // Get first record: "abcdefghij"
+    assertTrue("Expected record got nothing", reader.next(key, value));
+    assertEquals("Wrong length for record value", 10, value.getLength());
+    // Position should be 12 right after "abcdefghij++"
+    assertEquals("Wrong position after record read", 12, reader.getPos());
+    // Get second record: "kl"
+    assertTrue("Expected record got nothing", reader.next(key, value));
+    assertEquals("Wrong length for record value", 2, value.getLength());
+    // Position should be 16 right after "abcdefghij++kl++"
+    assertEquals("Wrong position after record read", 16, reader.getPos());
+    // Get third record: "mno"
+    assertTrue("Expected record got nothing", reader.next(key, value));
+    assertEquals("Wrong length for record value", 3, value.getLength());
+    // Position should be 19 right after "abcdefghij++kl++mno"
+    assertEquals("Wrong position after record read", 19, reader.getPos());
     assertFalse(reader.next(key, value));
-    assertEquals(19, reader.getPos());
-
-    split = new FileSplit(inputFile, 15, 4, (String[])null);
-    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
-    // No record is in the second split because the second split dropped
+    assertEquals("Wrong position after record read", 19, reader.getPos());
+    reader.close();
+    // No record is in the second split because the second split will drop
     // the first record, which was already reported by the first split.
-    // The position should be 19 right after "1234567890ab12ab345"
-    assertEquals(19, reader.getPos());
-    assertFalse(reader.next(key, value));
-    assertEquals(19, reader.getPos());
-
-    inputData = "123456789aab";
-    inputFile = createInputFile(conf, inputData);
-    split = new FileSplit(inputFile, 0, 12, (String[])null);
+    split = new FileSplit(inputFile, splitLength,
+        inputData.length() - splitLength, (String[]) null);
     reader = new LineRecordReader(conf, split, recordDelimiterBytes);
-    reader.next(key, value);
-    // Get first record:"123456789a"
-    assertEquals(10, value.getLength());
-    // Position should be 12 right after "123456789aab"
-    assertEquals(12, reader.getPos());
-    assertFalse(reader.next(key, value));
-    assertEquals(12, reader.getPos());
+    // The position should be 19 right after "abcdefghij++kl++mno" and should
+    // not change
+    assertEquals("Wrong position after record read", 19, reader.getPos());
+    assertFalse("Unexpected record returned", reader.next(key, value));
+    assertEquals("Wrong position after record read", 19, reader.getPos());
+    reader.close();
 
-    inputData = "123456789a";
+    // multi char delimiter with starting part of the delimiter in the data
+    inputData = "abcd+efgh++ijk++mno";
     inputFile = createInputFile(conf, inputData);
-    split = new FileSplit(inputFile, 0, 10, (String[])null);
+    splitLength = 5;
+    split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
     reader = new LineRecordReader(conf, split, recordDelimiterBytes);
-    reader.next(key, value);
-    // Get first record:"123456789a"
-    assertEquals(10, value.getLength());
-    // Position should be 10 right after "123456789a"
-    assertEquals(10, reader.getPos());
+    // Get first record: "abcd+efgh"
+    assertTrue("Expected record got nothing", reader.next(key, value));
+    assertEquals("Wrong position after record read", 11, reader.getPos());
+    assertEquals("Wrong length for record value", 9, value.getLength());
+    // should have jumped over the delimiter, no record
+    assertFalse("Unexpected record returned", reader.next(key, value));
+    assertEquals("Wrong position after record read", 11, reader.getPos());
+    reader.close();
+    // next split: check for duplicate or dropped records
+    split = new FileSplit(inputFile, splitLength,
+        inputData.length() - splitLength, (String[]) null);
+    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+    // Get second record: "ijk" first in this split
+    assertTrue("Expected record got nothing", reader.next(key, value));
+    assertEquals("Wrong position after record read", 16, reader.getPos());
+    assertEquals("Wrong length for record value", 3, value.getLength());
+    // Get third record: "mno" second in this split
+    assertTrue("Expected record got nothing", reader.next(key, value));
+    assertEquals("Wrong position after record read", 19, reader.getPos());
+    assertEquals("Wrong length for record value", 3, value.getLength());
+    // should be at the end of the input
     assertFalse(reader.next(key, value));
-    assertEquals(10, reader.getPos());
+    assertEquals("Wrong position after record read", 19, reader.getPos());
+    reader.close();
 
-    inputData = "123456789ab";
+    inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
     inputFile = createInputFile(conf, inputData);
-    split = new FileSplit(inputFile, 0, 11, (String[])null);
-    reader = new LineRecordReader(conf, split, recordDelimiterBytes);
-    reader.next(key, value);
-    // Get first record:"123456789"
-    assertEquals(9, value.getLength());
-    // Position should be 11 right after "123456789ab"
-    assertEquals(11, reader.getPos());
-    assertFalse(reader.next(key, value));
-    assertEquals(11, reader.getPos());
+    delimiter = "|+|";
+    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    // walking over the buffer and split sizes checks for proper processing
+    // of the ambiguous bytes of the delimiter
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
+        reader = new LineRecordReader(conf, split, recordDelimiterBytes);
+        // Get first record: "abcd|efgh" always possible
+        assertTrue("Expected record got nothing", reader.next(key, value));
+        assertTrue("abcd|efgh".equals(value.toString()));
+        assertEquals("Wrong position after record read", 9, value.getLength());
+        // Position should be 12 right after "|+|"
+        int recordPos = 12;
+        assertEquals("Wrong position after record read", recordPos,
+            reader.getPos());
+        // get the next record: "ij|kl" if the split/buffer allows it
+        if (reader.next(key, value)) {
+          // check the record info: "ij|kl"
+          assertTrue("ij|kl".equals(value.toString()));
+          // Position should be 20 right after "|+|"
+          recordPos = 20;
+          assertEquals("Wrong position after record read", recordPos,
+              reader.getPos());
+        }
+        // get the third record: "mno|pqr" if the split/buffer allows it
+        if (reader.next(key, value)) {
+          // check the record info: "mno|pqr"
+          assertTrue("mno|pqr".equals(value.toString()));
+          // Position should be 27 at the end of the string now
+          recordPos = inputData.length();
+          assertEquals("Wrong position after record read", recordPos,
+              reader.getPos());
+        }
+        // no more records can be read we should still be at the last position
+        assertFalse("Unexpected record returned", reader.next(key, value));
+        assertEquals("Wrong position after record read", recordPos,
+            reader.getPos());
+        reader.close();
+      }
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fd00b3d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
index dfe8b5d..6819af7 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.lib.input;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -320,16 +321,76 @@ public class TestLineRecordReader {
   @Test
   public void testUncompressedInput() throws Exception {
     Configuration conf = new Configuration();
-    String inputData = "abc+++def+++ghi+++"
-        + "jkl+++mno+++pqr+++stu+++vw +++xyz";
+    // single char delimiter, best case
+    String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
     Path inputFile = createInputFile(conf, inputData);
-    conf.set("textinputformat.record.delimiter", "+++");
+    conf.set("textinputformat.record.delimiter", "+");
     for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
       for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
         conf.setInt("io.file.buffer.size", bufferSize);
         testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
       }
     }
+    // multi char delimiter, best case
+    inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "|+|");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // single char delimiter with empty records
+    inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with empty records
+    inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "|+|");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with starting part of the delimiter in the data
+    inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+-");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with newline as start of the delimiter
+    inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "\n+");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
+    // multi char delimiter with newline in delimiter and in data
+    inputData = "abc\ndef+\nghi+\njkl\nmno";
+    inputFile = createInputFile(conf, inputData);
+    conf.set("textinputformat.record.delimiter", "+\n");
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        conf.setInt("io.file.buffer.size", bufferSize);
+        testSplitRecordsForFile(conf, splitSize, inputData.length(), 
inputFile);
+      }
+    }
   }
 
   @Test
@@ -349,91 +410,145 @@ public class TestLineRecordReader {
   public void testUncompressedInputCustomDelimiterPosValue()
       throws Exception {
     Configuration conf = new Configuration();
-    String inputData = "1234567890ab12ab345";
-    Path inputFile = createInputFile(conf, inputData);
     conf.setInt("io.file.buffer.size", 10);
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
         LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-    String delimiter = "ab";
+    String inputData = "abcdefghij++kl++mno";
+    Path inputFile = createInputFile(conf, inputData);
+    String delimiter = "++";
     byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
-    FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
+    int splitLength = 15;
+    FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[])null);
     TaskAttemptContext context = new TaskAttemptContextImpl(conf,
         new TaskAttemptID());
     LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
-    LongWritable key;
-    Text value;
-    reader.nextKeyValue();
-    key = reader.getCurrentKey();
-    value = reader.getCurrentValue();
-    // Get first record:"1234567890"
-    assertEquals(10, value.getLength());
-    assertEquals(0, key.get());
-    reader.nextKeyValue();
-    // Get second record:"12"
-    assertEquals(2, value.getLength());
-    // Key should be 12 right after "1234567890ab"
-    assertEquals(12, key.get());
-    reader.nextKeyValue();
-    // Get third record:"345"
-    assertEquals(3, value.getLength());
-    // Key should be 16 right after "1234567890ab12ab"
-    assertEquals(16, key.get());
+    // Get first record: "abcdefghij"
+    assertTrue("Expected record got nothing", reader.nextKeyValue());
+    LongWritable key = reader.getCurrentKey();
+    Text value = reader.getCurrentValue();
+    assertEquals("Wrong length for record value", 10, value.getLength());
+    assertEquals("Wrong position after record read", 0, key.get());
+    // Get second record: "kl"
+    assertTrue("Expected record got nothing", reader.nextKeyValue());
+    assertEquals("Wrong length for record value", 2, value.getLength());
+    // Key should be 12 right after "abcdefghij++"
+    assertEquals("Wrong position after record read", 12, key.get());
+    // Get third record: "mno"
+    assertTrue("Expected record got nothing", reader.nextKeyValue());
+    assertEquals("Wrong length for record value", 3, value.getLength());
+    // Key should be 16 right after "abcdefghij++kl++"
+    assertEquals("Wrong position after record read", 16, key.get());
     assertFalse(reader.nextKeyValue());
-    // Key should be 19 right after "1234567890ab12ab345"
-    assertEquals(19, key.get());
-
-    split = new FileSplit(inputFile, 15, 4, (String[])null);
+    // Key should be 19 right after "abcdefghij++kl++mno"
+    assertEquals("Wrong position after record read", 19, key.get());
+    // after refresh should be empty
+    key = reader.getCurrentKey();
+    assertNull("Unexpected key returned", key);
+    reader.close();
+    split = new FileSplit(inputFile, splitLength,
+        inputData.length() - splitLength, (String[])null);
     reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
     // No record is in the second split because the second split dropped
     // the first record, which was already reported by the first split.
-    assertFalse(reader.nextKeyValue());
+    assertFalse("Unexpected record returned", reader.nextKeyValue());
+    key = reader.getCurrentKey();
+    assertNull("Unexpected key returned", key);
+    reader.close();
 
-    inputData = "123456789aab";
+    // multi char delimiter with starting part of the delimiter in the data
+    inputData = "abcd+efgh++ijk++mno";
     inputFile = createInputFile(conf, inputData);
-    split = new FileSplit(inputFile, 0, 12, (String[])null);
+    splitLength = 5;
+    split = new FileSplit(inputFile, 0, splitLength, (String[])null);
     reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
-    reader.nextKeyValue();
+    // Get first record: "abcd+efgh"
+    assertTrue("Expected record got nothing", reader.nextKeyValue());
     key = reader.getCurrentKey();
     value = reader.getCurrentValue();
-    // Get first record:"123456789a"
-    assertEquals(10, value.getLength());
-    assertEquals(0, key.get());
+    assertEquals("Wrong position after record read", 0, key.get());
+    assertEquals("Wrong length for record value", 9, value.getLength());
+    // should have jumped over the delimiter, no record
     assertFalse(reader.nextKeyValue());
-    // Key should be 12 right after "123456789aab"
-    assertEquals(12, key.get());
-
-    inputData = "123456789a";
-    inputFile = createInputFile(conf, inputData);
-    split = new FileSplit(inputFile, 0, 10, (String[])null);
+    assertEquals("Wrong position after record read", 11, key.get());
+    // after refresh should be empty
+    key = reader.getCurrentKey();
+    assertNull("Unexpected key returned", key);
+    reader.close();
+    // next split: check for duplicate or dropped records
+    split = new FileSplit(inputFile, splitLength,
+        inputData.length () - splitLength, (String[])null);
     reader = new LineRecordReader(recordDelimiterBytes);
     reader.initialize(split, context);
-    reader.nextKeyValue();
+    assertTrue("Expected record got nothing", reader.nextKeyValue());
     key = reader.getCurrentKey();
     value = reader.getCurrentValue();
-    // Get first record:"123456789a"
-    assertEquals(10, value.getLength());
-    assertEquals(0, key.get());
+    // Get second record: "ijk" first in this split
+    assertEquals("Wrong position after record read", 11, key.get());
+    assertEquals("Wrong length for record value", 3, value.getLength());
+    // Get third record: "mno" second in this split
+    assertTrue("Expected record got nothing", reader.nextKeyValue());
+    assertEquals("Wrong position after record read", 16, key.get());
+    assertEquals("Wrong length for record value", 3, value.getLength());
+    // should be at the end of the input
     assertFalse(reader.nextKeyValue());
-    // Key should be 10 right after "123456789a"
-    assertEquals(10, key.get());
+    assertEquals("Wrong position after record read", 19, key.get());
+    reader.close();
 
-    inputData = "123456789ab";
+    inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
     inputFile = createInputFile(conf, inputData);
-    split = new FileSplit(inputFile, 0, 11, (String[])null);
-    reader = new LineRecordReader(recordDelimiterBytes);
-    reader.initialize(split, context);
-    reader.nextKeyValue();
-    key = reader.getCurrentKey();
-    value = reader.getCurrentValue();
-    // Get first record:"123456789"
-    assertEquals(9, value.getLength());
-    assertEquals(0, key.get());
-    assertFalse(reader.nextKeyValue());
-    // Key should be 11 right after "123456789ab"
-    assertEquals(11, key.get());
+    delimiter = "|+|";
+    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
+    // walking over the buffer and split sizes checks for proper processing
+    // of the ambiguous bytes of the delimiter
+    for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
+      for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
+        // track where we are in the inputdata
+        int keyPosition = 0;
+        conf.setInt("io.file.buffer.size", bufferSize);
+        split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
+        reader = new LineRecordReader(recordDelimiterBytes);
+        reader.initialize(split, context);
+        // Get the first record: "abcd|efgh" always possible
+        assertTrue("Expected record got nothing", reader.nextKeyValue());
+        key = reader.getCurrentKey();
+        value = reader.getCurrentValue();
+        assertTrue("abcd|efgh".equals(value.toString()));
+        // Position should be 0 right at the start
+        assertEquals("Wrong position after record read", keyPosition,
+            key.get());
+        // Position should be 12 right after the first "|+|"
+        keyPosition = 12;
+        // get the next record: "ij|kl" if the split/buffer allows it
+        if (reader.nextKeyValue()) {
+          // check the record info: "ij|kl"
+          assertTrue("ij|kl".equals(value.toString()));
+          assertEquals("Wrong position after record read", keyPosition,
+              key.get());
+          // Position should be 20 after the second "|+|"
+          keyPosition = 20;
+        }
+        // get the third record: "mno|pqr" if the split/buffer allows it
+        if (reader.nextKeyValue()) {
+          // check the record info: "mno|pqr"
+          assertTrue("mno|pqr".equals(value.toString()));
+          assertEquals("Wrong position after record read", keyPosition,
+              key.get());
+          // Position should be the end of the input
+          keyPosition = inputData.length();
+        }
+        assertFalse("Unexpected record returned", reader.nextKeyValue());
+        // no more records can be read we should be at the last position
+        assertEquals("Wrong position after record read", keyPosition,
+            key.get());
+        // after refresh should be empty
+        key = reader.getCurrentKey();
+        assertNull("Unexpected key returned", key);
+        reader.close();
+      }
+    }
   }
 
   @Test

Reply via email to