[core] Minor cleanups in FileInputFormat and DelimitedInputFormat
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efa62df8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efa62df8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efa62df8 Branch: refs/heads/master Commit: efa62df879c608daa1750797e2a34ffa928592f9 Parents: 3d5d63f Author: Stephan Ewen <[email protected]> Authored: Sun Jul 12 20:26:20 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Jul 13 15:14:41 2015 +0200 ---------------------------------------------------------------------- .../api/common/io/DelimitedInputFormat.java | 9 +- .../flink/api/common/io/FileInputFormat.java | 12 ++- .../api/common/io/DelimitedInputFormatTest.java | 98 ++++++++++---------- .../api/common/io/FileInputFormatTest.java | 39 ++++++-- 4 files changed, 91 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index d2b4e83..7fc42ab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.io; import java.io.IOException; @@ -40,8 +39,8 @@ import com.google.common.base.Charsets; * Base implementation for input formats that split the input at a delimiter into records. * The parsing of the record bytes into the record has to be implemented in the * {@link #readRecord(Object, byte[], int, int)} method. - * <p> - * The default delimiter is the newline character {@code '\n'}. + * + * <p>The default delimiter is the newline character {@code '\n'}.</p> */ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { @@ -495,7 +494,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { } int startPos = this.readPos; - int count = 0; + int count; while (this.readPos < this.limit && i < this.delimiter.length) { if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) { @@ -559,7 +558,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { private boolean fillBuffer() throws IOException { // special case for reading the whole split. - if(this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) { + if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) { int read = this.stream.read(this.readBuffer, 0, readBuffer.length); if (read == -1) { this.stream.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 0584b96..c9c9ec1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -47,12 +47,12 @@ import org.apache.flink.core.fs.Path; /** * The base class for {@link InputFormat}s that read from files. For specific input types the - * <tt>nextRecord()</tt> and <tt>reachedEnd()</tt> methods need to be implemented. + * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented. * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to * change the life cycle behavior. - * <p> - * After the {@link #open(FileInputSplit)} method completed, the file input data is available - * from the {@link #stream} field. + * + * <p>After the {@link #open(FileInputSplit)} method completed, the file input data is available + * from the {@link #stream} field.</p> */ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSplit> { @@ -245,6 +245,8 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp // TODO The job-submission web interface passes empty args (and thus empty // paths) to compute the preview graph. The following is a workaround for // this situation and we should fix this. + + // comment (Stephan Ewen) this should be no longer relevant with the current Java/Scalal APIs. if (filePath.isEmpty()) { setFilePath(new Path()); return; @@ -609,7 +611,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp * The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the * same filters by default. * - * @param fileStatus + * @param fileStatus The file status to check. * @return true, if the given file or directory is accepted */ protected boolean acceptFile(FileStatus fileStatus) { http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 9489f16..4af394f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -37,8 +38,7 @@ import java.io.OutputStreamWriter; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -49,7 +49,7 @@ public class DelimitedInputFormatTest { protected File tempFile; - private final DelimitedInputFormat<Record> format = new MyTextInputFormat(); + private final DelimitedInputFormat<String> format = new MyTextInputFormat(); // -------------------------------------------------------------------------------------------- @@ -90,7 +90,7 @@ public class DelimitedInputFormatTest { final int LINE_LENGTH_LIMIT = 12345; final int BUFFER_SIZE = 178; - DelimitedInputFormat<Record> format = new MyTextInputFormat(); + DelimitedInputFormat<String> format = new MyTextInputFormat(); format.setDelimiter(DELIMITER); format.setNumLineSamples(NUM_LINE_SAMPLES); format.setLineLengthLimit(LINE_LENGTH_LIMIT); @@ -104,7 +104,7 @@ public class DelimitedInputFormatTest { ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())); @SuppressWarnings("unchecked") - DelimitedInputFormat<Record> deserialized = (DelimitedInputFormat<Record>) ois.readObject(); + DelimitedInputFormat<String> deserialized = (DelimitedInputFormat<String>) ois.readObject(); assertEquals(NUM_LINE_SAMPLES, deserialized.getNumLineSamples()); assertEquals(LINE_LENGTH_LIMIT, deserialized.getLineLengthLimit()); @@ -115,7 +115,7 @@ public class DelimitedInputFormatTest { @Test public void testOpen() throws IOException { final String myString = "my mocked line 1\nmy mocked line 2\n"; - final FileInputSplit split = createTempFile(myString); + final FileInputSplit split = createTempFile(myString); int bufferSize = 5; format.setBufferSize(bufferSize); @@ -125,35 +125,42 @@ public class DelimitedInputFormatTest { assertEquals(bufferSize, format.getBufferSize()); } + /** + * Tests simple delimited parsing with a custom delimiter. + */ @Test - public void testRead() throws IOException { - final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2"; - final FileInputSplit split = createTempFile(myString); - - final Configuration parameters = new Configuration(); - - format.setDelimiter("$$$"); - format.configure(parameters); - format.open(split); - - Record theRecord = new Record(); + public void testRead() { + try { + final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2"; + final FileInputSplit split = createTempFile(myString); + + final Configuration parameters = new Configuration(); + + format.setDelimiter("$$$"); + format.configure(parameters); + format.open(split); + + String first = format.nextRecord(null); + assertNotNull(first); + assertEquals("my key|my val", first); - assertNotNull(format.nextRecord(theRecord)); - assertEquals("my key", theRecord.getField(0, StringValue.class).getValue()); - assertEquals("my val", theRecord.getField(1, StringValue.class).getValue()); - - assertNotNull(format.nextRecord(theRecord)); - assertEquals("my key2\n$$ctd.$$", theRecord.getField(0, StringValue.class).getValue()); - assertEquals("my value2", theRecord.getField(1, StringValue.class).getValue()); - - assertNull(format.nextRecord(theRecord)); - assertTrue(format.reachedEnd()); + String second = format.nextRecord(null); + assertNotNull(second); + assertEquals("my key2\n$$ctd.$$|my value2", second); + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testRead2() throws IOException { // 2. test case - final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2"; + final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2\n"; final FileInputSplit split = createTempFile(myString); final Configuration parameters = new Configuration(); @@ -162,20 +169,20 @@ public class DelimitedInputFormatTest { format.configure(parameters); format.open(split); - Record theRecord = new Record(); - - assertNotNull(format.nextRecord(theRecord)); - assertEquals("my key", theRecord.getField(0, StringValue.class).getValue()); - assertEquals("my val$$$my key2", theRecord.getField(1, StringValue.class).getValue()); + String first = format.nextRecord(null); + String second = format.nextRecord(null); - assertNotNull(format.nextRecord(theRecord)); - assertEquals("$$ctd.$$", theRecord.getField(0, StringValue.class).getValue()); - assertEquals("my value2", theRecord.getField(1, StringValue.class).getValue()); + assertNotNull(first); + assertNotNull(second); - assertNull(format.nextRecord(theRecord)); + assertEquals("my key|my val$$$my key2", first); + assertEquals("$$ctd.$$|my value2", second); + + assertNull(format.nextRecord(null)); assertTrue(format.reachedEnd()); } + private FileInputSplit createTempFile(String contents) throws IOException { this.tempFile = File.createTempFile("test_contents", "tmp"); this.tempFile.deleteOnExit(); @@ -187,22 +194,13 @@ public class DelimitedInputFormatTest { return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); } - protected static final class MyTextInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat<Record> { + + protected static final class MyTextInputFormat extends DelimitedInputFormat<String> { private static final long serialVersionUID = 1L; - private final StringValue str1 = new StringValue(); - private final StringValue str2 = new StringValue(); - @Override - public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) { - String theRecord = new String(bytes, offset, numBytes); - - str1.setValue(theRecord.substring(0, theRecord.indexOf('|'))); - str2.setValue(theRecord.substring(theRecord.indexOf('|') + 1)); - - reuse.setField(0, str1); - reuse.setField(1, str2); - return reuse; + public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) { + return new String(bytes, offset, numBytes); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/efa62df8/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java index 7e3edc0..5aac540 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java @@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.testutils.TestFileUtils; import org.apache.flink.types.IntValue; + import org.junit.Assert; import org.junit.Test; @@ -37,8 +38,17 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import static org.junit.Assert.*; + +/** + * Tests for the FileInputFormat + */ public class FileInputFormatTest { + // ------------------------------------------------------------------------ + // Statistics + // ------------------------------------------------------------------------ + @Test public void testGetStatisticsNonExistingFile() { try { @@ -189,6 +199,10 @@ public class FileInputFormatTest { Assert.fail(ex.getMessage()); } } + + // ------------------------------------------------------------------------ + // Unsplittable input files + // ------------------------------------------------------------------------ // ---- Tests for .deflate --------- @@ -234,6 +248,10 @@ public class FileInputFormatTest { Assert.fail(ex.getMessage()); } } + + // ------------------------------------------------------------------------ + // Ignored Files + // ------------------------------------------------------------------------ @Test public void testIgnoredUnderscoreFiles() { @@ -243,11 +261,13 @@ public class FileInputFormatTest { // create some accepted, some ignored files File tempDir = new File(System.getProperty("java.io.tmpdir")); - File f = null; + File f; do { f = new File(tempDir, TestFileUtils.randomFileName("")); - } while (f.exists()); - f.mkdirs(); + } + while (f.exists()); + + assertTrue(f.mkdirs()); f.deleteOnExit(); File child1 = new File(f, "dataFile1.txt"); @@ -301,11 +321,13 @@ public class FileInputFormatTest { // create two accepted and two ignored files File tempDir = new File(System.getProperty("java.io.tmpdir")); - File f = null; + File f; do { f = new File(tempDir, TestFileUtils.randomFileName("")); - } while (f.exists()); - f.mkdirs(); + } + while (f.exists()); + + assertTrue(f.mkdirs()); f.deleteOnExit(); File child1 = new File(f, "dataFile1.txt"); @@ -342,7 +364,10 @@ public class FileInputFormatTest { } } - + // ------------------------------------------------------------------------ + // Stream Decoration + // ------------------------------------------------------------------------ + @Test public void testDecorateInputStream() throws IOException { // create temporary file with 3 blocks
