Repository: flink
Updated Branches:
  refs/heads/master 61b1c0a6c -> d94dfde57


[core] Add tests for DelimitedInputFormat's handling of records across split 
boundaries


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d94dfde5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d94dfde5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d94dfde5

Branch: refs/heads/master
Commit: d94dfde570632e6114dbca44c6464f204c198866
Parents: efa62df
Author: Stephan Ewen <[email protected]>
Authored: Sun Jul 12 21:34:32 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Jul 13 15:14:41 2015 +0200

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  18 +-
 .../api/common/io/DelimitedInputFormatTest.java | 270 +++++++++++++++----
 2 files changed, 239 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 7fc42ab..a1b045f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -80,7 +80,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        
        static { loadGloablConfigParams(); }
        
-       protected static final void loadGloablConfigParams() {
+       protected static void loadGloablConfigParams() {
                int maxSamples = 
GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
                                
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES);
                int minSamples = 
GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
@@ -570,9 +570,19 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                                return true;
                        }
                }
+               
                // else ..
-               int toRead = this.splitLength > this.readBuffer.length ? 
this.readBuffer.length : (int) this.splitLength;
-               if (this.splitLength <= 0) {
+               int toRead;
+               if (this.splitLength > 0) {
+                       // if we have more data, read that
+                       toRead = this.splitLength > this.readBuffer.length ? 
this.readBuffer.length : (int) this.splitLength;
+               }
+               else {
+                       // if we have exhausted our split, we need to complete 
the current record, or read one
+                       // more across the next split.
+                       // the reason is that the next split will skip over the 
beginning until it finds the first
+                       // delimiter, discarding it as an incomplete chunk of 
data that belongs to the last record in the
+                       // previous split.
                        toRead = this.readBuffer.length;
                        this.overLimit = true;
                }
@@ -592,7 +602,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        }
        
        // 
============================================================================================
-       //  Parameterization via configuration
+       //  Parametrization via configuration
        // 
============================================================================================
        
        // ------------------------------------- Config Keys 
------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d94dfde5/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 4af394f..599a640 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -34,6 +33,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -45,10 +47,6 @@ import org.junit.Test;
 
 public class DelimitedInputFormatTest {
        
-       protected Configuration config;
-       
-       protected File tempFile;
-       
        private final DelimitedInputFormat<String> format = new 
MyTextInputFormat();
        
        // 
--------------------------------------------------------------------------------------------
@@ -56,7 +54,6 @@ public class DelimitedInputFormatTest {
        @Before
        public void setup() {
                this.format.setFilePath(new 
Path("file:///some/file/that/will/not/be/read"));
-               this.config = new Configuration();
        }
        
        @After
@@ -64,22 +61,20 @@ public class DelimitedInputFormatTest {
                if (this.format != null) {
                        this.format.close();
                }
-               if (this.tempFile != null) {
-                       this.tempFile.delete();
-               }
        }
 
        // 
--------------------------------------------------------------------------------------------
        // 
--------------------------------------------------------------------------------------------
        @Test
        public void testConfigure() {
-               this.config.setString("delimited-format.delimiter", "\n");
+               Configuration cfg = new Configuration();
+               cfg.setString("delimited-format.delimiter", "\n");
                
-               format.configure(this.config);
+               format.configure(cfg);
                assertEquals("\n", new String(format.getDelimiter()));
 
-               this.config.setString("delimited-format.delimiter", "&-&");
-               format.configure(this.config);
+               cfg.setString("delimited-format.delimiter", "&-&");
+               format.configure(cfg);
                assertEquals("&-&", new String(format.getDelimiter()));
        }
        
@@ -125,11 +120,58 @@ public class DelimitedInputFormatTest {
                assertEquals(bufferSize, format.getBufferSize());
        }
 
-       /**
-        * Tests simple delimited parsing with a custom delimiter.
-        */
        @Test
-       public void testRead() {
+       public void testReadWithoutTrailingDelimiter() throws IOException {
+               // 2. test case
+               final String myString = "my key|my val$$$my key2\n$$ctd.$$|my 
value2";
+               final FileInputSplit split = createTempFile(myString);
+
+               final Configuration parameters = new Configuration();
+               // default delimiter = '\n'
+
+               format.configure(parameters);
+               format.open(split);
+
+               String first = format.nextRecord(null);
+               String second = format.nextRecord(null);
+
+               assertNotNull(first);
+               assertNotNull(second);
+
+               assertEquals("my key|my val$$$my key2", first);
+               assertEquals("$$ctd.$$|my value2", second);
+
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+       }
+       
+       @Test
+       public void testReadWithTrailingDelimiter() throws IOException {
+               // 2. test case
+               final String myString = "my key|my val$$$my key2\n$$ctd.$$|my 
value2\n";
+               final FileInputSplit split = createTempFile(myString);
+
+               final Configuration parameters = new Configuration();
+               // default delimiter = '\n'
+
+               format.configure(parameters);
+               format.open(split);
+
+               String first = format.nextRecord(null);
+               String second = format.nextRecord(null);
+
+               assertNotNull(first);
+               assertNotNull(second);
+
+               assertEquals("my key|my val$$$my key2", first);
+               assertEquals("$$ctd.$$|my value2", second);
+
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+       }
+       
+       @Test
+       public void testReadCustomDelimiter() {
                try {
                        final String myString = "my key|my val$$$my 
key2\n$$ctd.$$|my value2";
                        final FileInputSplit split = createTempFile(myString);
@@ -156,42 +198,180 @@ public class DelimitedInputFormatTest {
                        fail(e.getMessage());
                }
        }
-       
+
+       /**
+        * Tests that the records are read correctly when the split boundary is 
in the middle of a record.
+        */
        @Test
-       public void testRead2() throws IOException {
-               // 2. test case
-               final String myString = "my key|my val$$$my key2\n$$ctd.$$|my 
value2\n";
-               final FileInputSplit split = createTempFile(myString);
-               
-               final Configuration parameters = new Configuration();
-               // default delimiter = '\n'
-               
-               format.configure(parameters);
-               format.open(split);
+       public void testReadOverSplitBoundariesUnaligned() {
+               try {
+                       final String myString = "value1\nvalue2\nvalue3";
+                       final FileInputSplit split = createTempFile(myString);
+                       
+                       FileInputSplit split1 = new FileInputSplit(0, 
split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+                       FileInputSplit split2 = new FileInputSplit(1, 
split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
 
-               String first = format.nextRecord(null);
-               String second = format.nextRecord(null);
-               
-               assertNotNull(first);
-               assertNotNull(second);
-               
-               assertEquals("my key|my val$$$my key2", first);
-               assertEquals("$$ctd.$$|my value2", second);
-               
-               assertNull(format.nextRecord(null));
-               assertTrue(format.reachedEnd());
+                       final Configuration parameters = new Configuration();
+                       
+                       format.configure(parameters);
+                       format.open(split1);
+                       
+                       assertEquals("value1", format.nextRecord(null));
+                       assertEquals("value2", format.nextRecord(null));
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+                       
+                       format.close();
+                       format.open(split2);
+
+                       assertEquals("value3", format.nextRecord(null));
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+                       
+                       format.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       /**
+        * Tests that the correct number of records is read when the split 
boundary is exact at the record boundary.
+        */
+       @Test
+       public void testReadWithBufferSizeIsMultple() {
+               try {
+                       final String myString = 
"aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+                       final FileInputSplit split = createTempFile(myString);
+
+                       FileInputSplit split1 = new FileInputSplit(0, 
split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+                       FileInputSplit split2 = new FileInputSplit(1, 
split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+
+                       final Configuration parameters = new Configuration();
+
+                       format.setBufferSize(2 * ((int) split1.getLength()));
+                       format.configure(parameters);
+
+                       String next;
+                       int count = 0;
+
+                       // read split 1
+                       format.open(split1);
+                       while ((next = format.nextRecord(null)) != null) {
+                               assertEquals(7, next.length());
+                               count++;
+                       }
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+                       format.close();
+                       
+                       // this one must have read one too many, because the 
next split will skipp the trailing remainder
+                       // which happens to be one full record
+                       assertEquals(3, count);
+
+                       // read split 2
+                       format.open(split2);
+                       while ((next = format.nextRecord(null)) != null) {
+                               assertEquals(7, next.length());
+                               count++;
+                       }
+                       format.close();
+
+                       assertEquals(4, count);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testReadExactlyBufferSize() {
+               try {
+                       final String myString = 
"aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+                       
+                       final FileInputSplit split = createTempFile(myString);
+                       final Configuration parameters = new Configuration();
+                       
+                       format.setBufferSize((int) split.getLength());
+                       format.configure(parameters);
+                       format.open(split);
+
+                       String next;
+                       int count = 0;
+                       while ((next = format.nextRecord(null)) != null) {
+                               assertEquals(7, next.length());
+                               count++;
+                       }
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+
+                       format.close();
+                       
+                       assertEquals(4, count);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testReadRecordsLargerThanBuffer() {
+               try {
+                       final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
+                                                                       
"bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
+                                                                       
"ccccccccccccccccccc\n" +
+                                                                       
"ddddddddddddddddddddddddddddddddddd\n";
+
+                       final FileInputSplit split = createTempFile(myString);
+                       FileInputSplit split1 = new FileInputSplit(0, 
split.getPath(), 0, split.getLength() / 2, split.getHostnames());
+                       FileInputSplit split2 = new FileInputSplit(1, 
split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+                       
+                       final Configuration parameters = new Configuration();
+
+                       format.setBufferSize(8);
+                       format.configure(parameters);
+
+                       String next;
+                       List<String> result = new ArrayList<String>();
+                       
+                       
+                       format.open(split1);
+                       while ((next = format.nextRecord(null)) != null) {
+                               result.add(next);
+                       }
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+                       format.close();
+
+                       format.open(split2);
+                       while ((next = format.nextRecord(null)) != null) {
+                               result.add(next);
+                       }
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+                       format.close();
+                       
+                       assertEquals(4, result.size());
+                       assertEquals(Arrays.asList(myString.split("\n")), 
result);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
        
-       
-       private FileInputSplit createTempFile(String contents) throws 
IOException {
-               this.tempFile = File.createTempFile("test_contents", "tmp");
-               this.tempFile.deleteOnExit();
+       private static FileInputSplit createTempFile(String contents) throws 
IOException {
+               File tempFile = File.createTempFile("test_contents", "tmp");
+               tempFile.deleteOnExit();
                
-               OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(this.tempFile));
+               OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
                wrt.write(contents);
                wrt.close();
                
-               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
+               return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] 
{"localhost"});
        }
        
        

Reply via email to