[core] Minor cleanups in FileInputFormat and DelimitedInputFormat

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efa62df8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efa62df8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efa62df8

Branch: refs/heads/master
Commit: efa62df879c608daa1750797e2a34ffa928592f9
Parents: 3d5d63f
Author: Stephan Ewen <[email protected]>
Authored: Sun Jul 12 20:26:20 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Jul 13 15:14:41 2015 +0200

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  9 +-
 .../flink/api/common/io/FileInputFormat.java    | 12 ++-
 .../api/common/io/DelimitedInputFormatTest.java | 98 ++++++++++----------
 .../api/common/io/FileInputFormatTest.java      | 39 ++++++--
 4 files changed, 91 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index d2b4e83..7fc42ab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.io;
 
 import java.io.IOException;
@@ -40,8 +39,8 @@ import com.google.common.base.Charsets;
  * Base implementation for input formats that split the input at a delimiter 
into records.
  * The parsing of the record bytes into the record has to be implemented in the
  * {@link #readRecord(Object, byte[], int, int)} method.
- * <p>
- * The default delimiter is the newline character {@code '\n'}.
+ * 
+ * <p>The default delimiter is the newline character {@code '\n'}.</p>
  */
 public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
        
@@ -495,7 +494,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                        }
 
                        int startPos = this.readPos;
-                       int count = 0;
+                       int count;
 
                        while (this.readPos < this.limit && i < 
this.delimiter.length) {
                                if ((this.readBuffer[this.readPos++]) == 
this.delimiter[i]) {
@@ -559,7 +558,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
 
        private boolean fillBuffer() throws IOException {
                // special case for reading the whole split.
-               if(this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
+               if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
                        int read = this.stream.read(this.readBuffer, 0, 
readBuffer.length);
                        if (read == -1) {
                                this.stream.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 0584b96..c9c9ec1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -47,12 +47,12 @@ import org.apache.flink.core.fs.Path;
 
 /**
  * The base class for {@link InputFormat}s that read from files. For specific 
input types the 
- * <tt>nextRecord()</tt> and <tt>reachedEnd()</tt> methods need to be 
implemented.
+ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be 
implemented.
  * Additionally, one may override {@link #open(FileInputSplit)} and {@link 
#close()} to
  * change the life cycle behavior.
- * <p>
- * After the {@link #open(FileInputSplit)} method completed, the file input 
data is available
- * from the {@link #stream} field.
+ * 
+ * <p>After the {@link #open(FileInputSplit)} method completed, the file input 
data is available
+ * from the {@link #stream} field.</p>
  */
 public abstract class FileInputFormat<OT> implements InputFormat<OT, 
FileInputSplit> {
        
@@ -245,6 +245,8 @@ public abstract class FileInputFormat<OT> implements 
InputFormat<OT, FileInputSp
                // TODO The job-submission web interface passes empty args (and 
thus empty
                // paths) to compute the preview graph. The following is a 
workaround for
                // this situation and we should fix this.
+               
+               // comment (Stephan Ewen) this should be no longer relevant 
with the current Java/Scalal APIs.
                if (filePath.isEmpty()) {
                        setFilePath(new Path());
                        return;
@@ -609,7 +611,7 @@ public abstract class FileInputFormat<OT> implements 
InputFormat<OT, FileInputSp
         * The method may be overridden. Hadoop's FileInputFormat has a similar 
mechanism and applies the
         * same filters by default.
         * 
-        * @param fileStatus
+        * @param fileStatus The file status to check.
         * @return true, if the given file or directory is accepted
         */
        protected boolean acceptFile(FileStatus fileStatus) {

http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 9489f16..4af394f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -37,8 +38,7 @@ import java.io.OutputStreamWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class DelimitedInputFormatTest {
        
        protected File tempFile;
        
-       private final DelimitedInputFormat<Record> format = new 
MyTextInputFormat();
+       private final DelimitedInputFormat<String> format = new 
MyTextInputFormat();
        
        // 
--------------------------------------------------------------------------------------------
 
@@ -90,7 +90,7 @@ public class DelimitedInputFormatTest {
                final int LINE_LENGTH_LIMIT = 12345;
                final int BUFFER_SIZE = 178;
                
-               DelimitedInputFormat<Record> format = new MyTextInputFormat();
+               DelimitedInputFormat<String> format = new MyTextInputFormat();
                format.setDelimiter(DELIMITER);
                format.setNumLineSamples(NUM_LINE_SAMPLES);
                format.setLineLengthLimit(LINE_LENGTH_LIMIT);
@@ -104,7 +104,7 @@ public class DelimitedInputFormatTest {
                
                ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
                @SuppressWarnings("unchecked")
-               DelimitedInputFormat<Record> deserialized = 
(DelimitedInputFormat<Record>) ois.readObject();
+               DelimitedInputFormat<String> deserialized = 
(DelimitedInputFormat<String>) ois.readObject();
                
                assertEquals(NUM_LINE_SAMPLES, 
deserialized.getNumLineSamples());
                assertEquals(LINE_LENGTH_LIMIT, 
deserialized.getLineLengthLimit());
@@ -115,7 +115,7 @@ public class DelimitedInputFormatTest {
        @Test
        public void testOpen() throws IOException {
                final String myString = "my mocked line 1\nmy mocked line 2\n";
-               final FileInputSplit split = createTempFile(myString);  
+               final FileInputSplit split = createTempFile(myString);
                
                int bufferSize = 5;
                format.setBufferSize(bufferSize);
@@ -125,35 +125,42 @@ public class DelimitedInputFormatTest {
                assertEquals(bufferSize, format.getBufferSize());
        }
 
+       /**
+        * Tests simple delimited parsing with a custom delimiter.
+        */
        @Test
-       public void testRead() throws IOException {
-               final String myString = "my key|my val$$$my key2\n$$ctd.$$|my 
value2";
-               final FileInputSplit split = createTempFile(myString);
-               
-               final Configuration parameters = new Configuration();
-               
-               format.setDelimiter("$$$");
-               format.configure(parameters);
-               format.open(split);
-               
-               Record theRecord = new Record();
+       public void testRead() {
+               try {
+                       final String myString = "my key|my val$$$my 
key2\n$$ctd.$$|my value2";
+                       final FileInputSplit split = createTempFile(myString);
+                       
+                       final Configuration parameters = new Configuration();
+                       
+                       format.setDelimiter("$$$");
+                       format.configure(parameters);
+                       format.open(split);
+       
+                       String first = format.nextRecord(null);
+                       assertNotNull(first);
+                       assertEquals("my key|my val", first);
 
-               assertNotNull(format.nextRecord(theRecord));
-               assertEquals("my key", theRecord.getField(0, 
StringValue.class).getValue());
-               assertEquals("my val", theRecord.getField(1, 
StringValue.class).getValue());
-               
-               assertNotNull(format.nextRecord(theRecord));
-               assertEquals("my key2\n$$ctd.$$", theRecord.getField(0, 
StringValue.class).getValue());
-               assertEquals("my value2", theRecord.getField(1, 
StringValue.class).getValue());
-               
-               assertNull(format.nextRecord(theRecord));
-               assertTrue(format.reachedEnd());
+                       String second = format.nextRecord(null);
+                       assertNotNull(second);
+                       assertEquals("my key2\n$$ctd.$$|my value2", second);
+                       
+                       assertNull(format.nextRecord(null));
+                       assertTrue(format.reachedEnd());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
        
        @Test
        public void testRead2() throws IOException {
                // 2. test case
-               final String myString = "my key|my val$$$my key2\n$$ctd.$$|my 
value2";
+               final String myString = "my key|my val$$$my key2\n$$ctd.$$|my 
value2\n";
                final FileInputSplit split = createTempFile(myString);
                
                final Configuration parameters = new Configuration();
@@ -162,20 +169,20 @@ public class DelimitedInputFormatTest {
                format.configure(parameters);
                format.open(split);
 
-               Record theRecord = new Record();
-
-               assertNotNull(format.nextRecord(theRecord));
-               assertEquals("my key", theRecord.getField(0, 
StringValue.class).getValue());
-               assertEquals("my val$$$my key2", theRecord.getField(1, 
StringValue.class).getValue());
+               String first = format.nextRecord(null);
+               String second = format.nextRecord(null);
                
-               assertNotNull(format.nextRecord(theRecord));
-               assertEquals("$$ctd.$$", theRecord.getField(0, 
StringValue.class).getValue());
-               assertEquals("my value2", theRecord.getField(1, 
StringValue.class).getValue());
+               assertNotNull(first);
+               assertNotNull(second);
                
-               assertNull(format.nextRecord(theRecord));
+               assertEquals("my key|my val$$$my key2", first);
+               assertEquals("$$ctd.$$|my value2", second);
+               
+               assertNull(format.nextRecord(null));
                assertTrue(format.reachedEnd());
        }
        
+       
        private FileInputSplit createTempFile(String contents) throws 
IOException {
                this.tempFile = File.createTempFile("test_contents", "tmp");
                this.tempFile.deleteOnExit();
@@ -187,22 +194,13 @@ public class DelimitedInputFormatTest {
                return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
        }
        
-       protected static final class MyTextInputFormat extends 
org.apache.flink.api.common.io.DelimitedInputFormat<Record> {
+       
+       protected static final class MyTextInputFormat extends 
DelimitedInputFormat<String> {
                private static final long serialVersionUID = 1L;
                
-               private final StringValue str1 = new StringValue();
-               private final StringValue str2 = new StringValue();
-               
                @Override
-               public Record readRecord(Record reuse, byte[] bytes, int 
offset, int numBytes) {
-                       String theRecord = new String(bytes, offset, numBytes);
-                       
-                       str1.setValue(theRecord.substring(0, 
theRecord.indexOf('|')));
-                       
str2.setValue(theRecord.substring(theRecord.indexOf('|') + 1));
-                       
-                       reuse.setField(0, str1);
-                       reuse.setField(1, str2);
-                       return reuse;
+               public String readRecord(String reuse, byte[] bytes, int 
offset, int numBytes) {
+                       return new String(bytes, offset, numBytes);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 7e3edc0..5aac540 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.types.IntValue;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -37,8 +38,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the FileInputFormat
+ */
 public class FileInputFormatTest {
 
+       // 
------------------------------------------------------------------------
+       //  Statistics
+       // 
------------------------------------------------------------------------
+       
        @Test
        public void testGetStatisticsNonExistingFile() {
                try {
@@ -189,6 +199,10 @@ public class FileInputFormatTest {
                        Assert.fail(ex.getMessage());
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  Unsplittable input files
+       // 
------------------------------------------------------------------------
        
        // ---- Tests for .deflate ---------
        
@@ -234,6 +248,10 @@ public class FileInputFormatTest {
                        Assert.fail(ex.getMessage());
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  Ignored Files
+       // 
------------------------------------------------------------------------
        
        @Test
        public void testIgnoredUnderscoreFiles() {
@@ -243,11 +261,13 @@ public class FileInputFormatTest {
                        // create some accepted, some ignored files
                        
                        File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-                       File f = null;
+                       File f;
                        do {
                                f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-                       } while (f.exists());
-                       f.mkdirs();
+                       }
+                       while (f.exists());
+
+                       assertTrue(f.mkdirs());
                        f.deleteOnExit();
                        
                        File child1 = new File(f, "dataFile1.txt");
@@ -301,11 +321,13 @@ public class FileInputFormatTest {
 
                        // create two accepted and two ignored files
                        File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-                       File f = null;
+                       File f;
                        do {
                                f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-                       } while (f.exists());
-                       f.mkdirs();
+                       }
+                       while (f.exists());
+                       
+                       assertTrue(f.mkdirs());
                        f.deleteOnExit();
 
                        File child1 = new File(f, "dataFile1.txt");
@@ -342,7 +364,10 @@ public class FileInputFormatTest {
                }
        }
 
-
+       // 
------------------------------------------------------------------------
+       //  Stream Decoration
+       // 
------------------------------------------------------------------------
+       
        @Test
        public void testDecorateInputStream() throws IOException {
                // create temporary file with 3 blocks

Reply via email to