Repository: flink Updated Branches: refs/heads/master 61b1c0a6c -> d94dfde57
[core] Add tests for DelimitedInputFormat's handling of records across split boundaries Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d94dfde5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d94dfde5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d94dfde5 Branch: refs/heads/master Commit: d94dfde570632e6114dbca44c6464f204c198866 Parents: efa62df Author: Stephan Ewen <[email protected]> Authored: Sun Jul 12 21:34:32 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Jul 13 15:14:41 2015 +0200 ---------------------------------------------------------------------- .../api/common/io/DelimitedInputFormat.java | 18 +- .../api/common/io/DelimitedInputFormatTest.java | 270 +++++++++++++++---- 2 files changed, 239 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 7fc42ab..a1b045f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -80,7 +80,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { static { loadGloablConfigParams(); } - protected static final void loadGloablConfigParams() { + protected static void loadGloablConfigParams() { int maxSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY, ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES); int minSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY, @@ -570,9 +570,19 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { return true; } } + // else .. - int toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength; - if (this.splitLength <= 0) { + int toRead; + if (this.splitLength > 0) { + // if we have more data, read that + toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength; + } + else { + // if we have exhausted our split, we need to complete the current record, or read one + // more across the next split. + // the reason is that the next split will skip over the beginning until it finds the first + // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the + // previous split. toRead = this.readBuffer.length; this.overLimit = true; } @@ -592,7 +602,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { } // ============================================================================================ - // Parameterization via configuration + // Parametrization via configuration // ============================================================================================ // ------------------------------------- Config Keys ------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 4af394f..599a640 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.io; import static org.junit.Assert.assertArrayEquals; @@ -34,6 +33,9 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; @@ -45,10 +47,6 @@ import org.junit.Test; public class DelimitedInputFormatTest { - protected Configuration config; - - protected File tempFile; - private final DelimitedInputFormat<String> format = new MyTextInputFormat(); // -------------------------------------------------------------------------------------------- @@ -56,7 +54,6 @@ public class DelimitedInputFormatTest { @Before public void setup() { this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read")); - this.config = new Configuration(); } @After @@ -64,22 +61,20 @@ public class DelimitedInputFormatTest { if (this.format != null) { this.format.close(); } - if (this.tempFile != null) { - this.tempFile.delete(); - } } // -------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------- @Test public void testConfigure() { - this.config.setString("delimited-format.delimiter", "\n"); + Configuration cfg = new Configuration(); + cfg.setString("delimited-format.delimiter", "\n"); - format.configure(this.config); + format.configure(cfg); assertEquals("\n", new String(format.getDelimiter())); - this.config.setString("delimited-format.delimiter", "&-&"); - format.configure(this.config); + cfg.setString("delimited-format.delimiter", "&-&"); + format.configure(cfg); assertEquals("&-&", new String(format.getDelimiter())); } @@ -125,11 +120,58 @@ public class DelimitedInputFormatTest { assertEquals(bufferSize, format.getBufferSize()); } - /** - * Tests simple delimited parsing with a custom delimiter. - */ @Test - public void testRead() { + public void testReadWithoutTrailingDelimiter() throws IOException { + // 2. test case + final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2"; + final FileInputSplit split = createTempFile(myString); + + final Configuration parameters = new Configuration(); + // default delimiter = '\n' + + format.configure(parameters); + format.open(split); + + String first = format.nextRecord(null); + String second = format.nextRecord(null); + + assertNotNull(first); + assertNotNull(second); + + assertEquals("my key|my val$$$my key2", first); + assertEquals("$$ctd.$$|my value2", second); + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + } + + @Test + public void testReadWithTrailingDelimiter() throws IOException { + // 2. test case + final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n"; + final FileInputSplit split = createTempFile(myString); + + final Configuration parameters = new Configuration(); + // default delimiter = '\n' + + format.configure(parameters); + format.open(split); + + String first = format.nextRecord(null); + String second = format.nextRecord(null); + + assertNotNull(first); + assertNotNull(second); + + assertEquals("my key|my val$$$my key2", first); + assertEquals("$$ctd.$$|my value2", second); + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + } + + @Test + public void testReadCustomDelimiter() { try { final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2"; final FileInputSplit split = createTempFile(myString); @@ -156,42 +198,180 @@ public class DelimitedInputFormatTest { fail(e.getMessage()); } } - + + /** + * Tests that the records are read correctly when the split boundary is in the middle of a record. + */ @Test - public void testRead2() throws IOException { - // 2. test case - final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n"; - final FileInputSplit split = createTempFile(myString); - - final Configuration parameters = new Configuration(); - // default delimiter = '\n' - - format.configure(parameters); - format.open(split); + public void testReadOverSplitBoundariesUnaligned() { + try { + final String myString = "value1\nvalue2\nvalue3"; + final FileInputSplit split = createTempFile(myString); + + FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); + FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); - String first = format.nextRecord(null); - String second = format.nextRecord(null); - - assertNotNull(first); - assertNotNull(second); - - assertEquals("my key|my val$$$my key2", first); - assertEquals("$$ctd.$$|my value2", second); - - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); + final Configuration parameters = new Configuration(); + + format.configure(parameters); + format.open(split1); + + assertEquals("value1", format.nextRecord(null)); + assertEquals("value2", format.nextRecord(null)); + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + + format.close(); + format.open(split2); + + assertEquals("value3", format.nextRecord(null)); + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + + format.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Tests that the correct number of records is read when the split boundary is exact at the record boundary. + */ + @Test + public void testReadWithBufferSizeIsMultple() { + try { + final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n"; + final FileInputSplit split = createTempFile(myString); + + FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); + FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); + + final Configuration parameters = new Configuration(); + + format.setBufferSize(2 * ((int) split1.getLength())); + format.configure(parameters); + + String next; + int count = 0; + + // read split 1 + format.open(split1); + while ((next = format.nextRecord(null)) != null) { + assertEquals(7, next.length()); + count++; + } + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + format.close(); + + // this one must have read one too many, because the next split will skipp the trailing remainder + // which happens to be one full record + assertEquals(3, count); + + // read split 2 + format.open(split2); + while ((next = format.nextRecord(null)) != null) { + assertEquals(7, next.length()); + count++; + } + format.close(); + + assertEquals(4, count); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testReadExactlyBufferSize() { + try { + final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n"; + + final FileInputSplit split = createTempFile(myString); + final Configuration parameters = new Configuration(); + + format.setBufferSize((int) split.getLength()); + format.configure(parameters); + format.open(split); + + String next; + int count = 0; + while ((next = format.nextRecord(null)) != null) { + assertEquals(7, next.length()); + count++; + } + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + + format.close(); + + assertEquals(4, count); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testReadRecordsLargerThanBuffer() { + try { + final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" + + "bbbbbbbbbbbbbbbbbbbbbbbbb\n" + + "ccccccccccccccccccc\n" + + "ddddddddddddddddddddddddddddddddddd\n"; + + final FileInputSplit split = createTempFile(myString); + FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); + FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); + + final Configuration parameters = new Configuration(); + + format.setBufferSize(8); + format.configure(parameters); + + String next; + List<String> result = new ArrayList<String>(); + + + format.open(split1); + while ((next = format.nextRecord(null)) != null) { + result.add(next); + } + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + format.close(); + + format.open(split2); + while ((next = format.nextRecord(null)) != null) { + result.add(next); + } + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + format.close(); + + assertEquals(4, result.size()); + assertEquals(Arrays.asList(myString.split("\n")), result); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } - - private FileInputSplit createTempFile(String contents) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp"); - this.tempFile.deleteOnExit(); + private static FileInputSplit createTempFile(String contents) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); - OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(this.tempFile)); + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); wrt.write(contents); wrt.close(); - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); }
