Repository: flink Updated Branches: refs/heads/master c854d5260 -> 16ab1fd97
[FLINK-1981] add support for GZIP files * register decompression algorithms with file extensions for extensibility * fit deflate decompression into this scheme * add support for GZIP files * test support for deflate and GZIP files with the CsvInputFormat * replace Apache Commons' Validate with Guava's Preconditions * add documentation on reading compressed files This closes #762. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16ab1fd9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16ab1fd9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16ab1fd9 Branch: refs/heads/master Commit: 16ab1fd97f5ffc2d77a288b5b561f9cbbbd1f63c Parents: c854d52 Author: Sebastian Kruse <sebastian.kr...@hpi.de> Authored: Tue Jun 2 18:58:35 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Mon Jun 8 10:43:45 2015 +0200 ---------------------------------------------------------------------- docs/apis/programming_guide.md | 33 ++++++ .../flink/api/common/io/FileInputFormat.java | 79 ++++++++++++-- .../io/InflaterInputStreamFSInputWrapper.java | 32 ------ .../DeflateInflaterInputStreamFactory.java | 49 +++++++++ .../GzipInflaterInputStreamFactory.java | 48 +++++++++ .../compression/InflaterInputStreamFactory.java | 44 ++++++++ .../common/io/GenericCsvInputFormatTest.java | 105 +++++++++++++++++++ 7 files changed, 351 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/docs/apis/programming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md index 517781f..17903a9 100644 --- a/docs/apis/programming_guide.md +++ b/docs/apis/programming_guide.md @@ -1880,6 +1880,39 @@ env.readTextFile("file:///path/with.nested/files").withParameters(parameters) </div> </div> + +### Read Compressed Files + +Flink currently supports transparent decompression of input files if these are marked with an appropriate file extension. In particular, this means that no further configuration of the input formats is necessary and any `FileInputFormat` support the compression, including custom input formats. Please notice that compressed files might not be read in parallel, thus impacting job scalability. + +The following table lists the currently supported compression methods. + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Compression method</th> + <th class="text-left">File extensions</th> + <th class="text-left" style="width: 20%">Parallelizable</th> + </tr> + </thead> + + <tbody> + <tr> + <td><strong>DEFLATE</strong></td> + <td>`.deflate`</td> + <td>no</td> + </tr> + <tr> + <td><strong>GZip</strong></td> + <td>`.gz`, `.gzip`</td> + <td>no</td> + </tr> + </tbody> +</table> + + [Back to top](#top) http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index cdc408d..0584b96 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -21,10 +21,16 @@ package org.apache.flink.api.common.io; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory; +import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory; +import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; @@ -68,10 +74,11 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp private static long DEFAULT_OPENING_TIMEOUT; /** - * Files with that suffix are unsplittable at a file level - * and compressed. + * A mapping of file extensions to decompression algorithms based on DEFLATE. Such compressions lead to + * unsplittable files. */ - protected static final String DEFLATE_SUFFIX = ".deflate"; + protected static final Map<String, InflaterInputStreamFactory<?>> INFLATER_INPUT_STREAM_FACTORIES = + new HashMap<String, InflaterInputStreamFactory<?>>(); /** * The splitLength is set to -1L for reading the whole split. @@ -80,6 +87,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp static { initDefaultsFromConfiguration(); + initDefaultInflaterInputStreamFactories(); } private static void initDefaultsFromConfiguration() { @@ -96,6 +104,52 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp DEFAULT_OPENING_TIMEOUT = to; } } + + private static void initDefaultInflaterInputStreamFactories() { + InflaterInputStreamFactory<?>[] defaultFactories = { + DeflateInflaterInputStreamFactory.getInstance(), + GzipInflaterInputStreamFactory.getInstance() + }; + for (InflaterInputStreamFactory<?> inputStreamFactory : defaultFactories) { + for (String fileExtension : inputStreamFactory.getCommonFileExtensions()) { + registerInflaterInputStreamFactory(fileExtension, inputStreamFactory); + } + } + } + + /** + * Registers a decompression algorithm through a {@link org.apache.flink.api.common.io.compression.InflaterInputStreamFactory} + * with a file extension for transparent decompression. + * @param fileExtension of the compressed files + * @param factory to create an {@link java.util.zip.InflaterInputStream} that handles the decompression format + */ + public static void registerInflaterInputStreamFactory(String fileExtension, InflaterInputStreamFactory<?> factory) { + synchronized (INFLATER_INPUT_STREAM_FACTORIES) { + if (INFLATER_INPUT_STREAM_FACTORIES.put(fileExtension, factory) != null) { + LOG.warn("Overwriting an existing decompression algorithm for \"{}\" files.", fileExtension); + } + } + } + + protected static InflaterInputStreamFactory<?> getInflaterInputStreamFactory(String fileExtension) { + synchronized (INFLATER_INPUT_STREAM_FACTORIES) { + return INFLATER_INPUT_STREAM_FACTORIES.get(fileExtension); + } + } + + /** + * Returns the extension of a file name (!= a path). + * @return the extension of the file name or {@code null} if there is no extension. + */ + protected static String extractFileExtension(String fileName) { + Preconditions.checkNotNull(fileName); + int lastPeriodIndex = fileName.lastIndexOf('.'); + if (lastPeriodIndex < 0){ + return null; + } else { + return fileName.substring(lastPeriodIndex + 1); + } + } static long getDefaultOpeningTimeout() { return DEFAULT_OPENING_TIMEOUT; @@ -533,13 +587,23 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp } protected boolean testForUnsplittable(FileStatus pathFile) { - if(pathFile.getPath().getName().endsWith(DEFLATE_SUFFIX)) { + if(getInflaterInputStreamFactory(pathFile.getPath()) != null) { unsplittable = true; return true; } return false; } + private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) { + String fileExtension = extractFileExtension(path.getName()); + if (fileExtension != null) { + return getInflaterInputStreamFactory(fileExtension); + } else { + return null; + } + + } + /** * A simple hook to filter files and directories from the input. * The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the @@ -633,9 +697,10 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp * @see org.apache.flink.api.common.io.InputStreamFSInputWrapper */ protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { - // Wrap stream in a extracting (decompressing) stream if file ends with .deflate. - if (fileSplit.getPath().getName().endsWith(DEFLATE_SUFFIX)) { - return new InflaterInputStreamFSInputWrapper(stream); + // Wrap stream in a extracting (decompressing) stream if file ends with a known compression file extension. + InflaterInputStreamFactory<?> inflaterInputStreamFactory = getInflaterInputStreamFactory(fileSplit.getPath()); + if (inflaterInputStreamFactory != null) { + return new InputStreamFSInputWrapper(inflaterInputStreamFactory.create(stream)); } return inputStream; http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java deleted file mode 100644 index ac87535..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.io; - -import org.apache.flink.core.fs.FSDataInputStream; - -import java.util.zip.InflaterInputStream; - -public class InflaterInputStreamFSInputWrapper extends InputStreamFSInputWrapper { - - public InflaterInputStreamFSInputWrapper(FSDataInputStream inStream) { - super(new InflaterInputStream(inStream)); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java new file mode 100644 index 0000000..20c79db --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io.compression; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.zip.InflaterInputStream; + +/** + * Factory for input streams that decompress the "deflate" compression format. + */ +public class DeflateInflaterInputStreamFactory implements InflaterInputStreamFactory<InflaterInputStream> { + + private static DeflateInflaterInputStreamFactory INSTANCE = null; + + public static DeflateInflaterInputStreamFactory getInstance() { + if (INSTANCE == null) { + INSTANCE = new DeflateInflaterInputStreamFactory(); + } + return INSTANCE; + } + + @Override + public InflaterInputStream create(InputStream in) throws IOException { + return new InflaterInputStream(in); + } + + @Override + public Collection<String> getCommonFileExtensions() { + return Collections.singleton("deflate"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java new file mode 100644 index 0000000..aebbac3 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io.compression; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.zip.GZIPInputStream; + +/** + * Factory for input streams that decompress the GZIP compression format. + */ +public class GzipInflaterInputStreamFactory implements InflaterInputStreamFactory<GZIPInputStream> { + + private static GzipInflaterInputStreamFactory INSTANCE = null; + + public static GzipInflaterInputStreamFactory getInstance() { + if (INSTANCE == null) { + INSTANCE = new GzipInflaterInputStreamFactory(); + } + return INSTANCE; + } + @Override + public GZIPInputStream create(InputStream in) throws IOException { + return new GZIPInputStream(in); + } + + @Override + public Collection<String> getCommonFileExtensions() { + return Arrays.asList("gz", "gzip"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java new file mode 100644 index 0000000..a6787c5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io.compression; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.zip.InflaterInputStream; + +/** + * Creates a new instance of a certain subclass of {@link java.util.zip.InflaterInputStream}. + */ +public interface InflaterInputStreamFactory<T extends InflaterInputStream> { + + /** + * Creates a {@link java.util.zip.InflaterInputStream} that wraps the given input stream. + * @param in is the compressed input stream + * @return the inflated input stream + */ + T create(InputStream in) throws IOException; + + /** + * Lists a collection of typical file extensions (e.g., "gz", "gzip") that are associated with the compression + * algorithm in the {@link java.util.zip.InflaterInputStream} {@code T}. + * @return a (possibly empty) collection of lower-case file extensions, without the period + */ + Collection<String> getCommonFileExtensions(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index 3749645..20b130c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -30,6 +30,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Arrays; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.GZIPOutputStream; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; @@ -39,6 +41,7 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; +import org.jets3t.service.io.GZipDeflatingInputStream; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -119,6 +122,86 @@ public class GenericCsvInputFormatTest { fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); } } + + @Test + public void testReadNoPosAllDeflate() throws IOException { + try { + final String fileContent = "111|222|333|444|555\n666|777|888|999|000|"; + final FileInputSplit split = createTempDeflateFile(fileContent); + + final Configuration parameters = new Configuration(); + + format.setFieldDelimiter("|"); + format.setFieldTypesGeneric(IntValue.class, IntValue.class, IntValue.class, IntValue.class, IntValue.class); + + format.configure(parameters); + format.open(split); + + Value[] values = createIntValues(5); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(111, ((IntValue) values[0]).getValue()); + assertEquals(222, ((IntValue) values[1]).getValue()); + assertEquals(333, ((IntValue) values[2]).getValue()); + assertEquals(444, ((IntValue) values[3]).getValue()); + assertEquals(555, ((IntValue) values[4]).getValue()); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(666, ((IntValue) values[0]).getValue()); + assertEquals(777, ((IntValue) values[1]).getValue()); + assertEquals(888, ((IntValue) values[2]).getValue()); + assertEquals(999, ((IntValue) values[3]).getValue()); + assertEquals(000, ((IntValue) values[4]).getValue()); + + assertNull(format.nextRecord(values)); + assertTrue(format.reachedEnd()); + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + } + + @Test + public void testReadNoPosAllGzip() throws IOException { + try { + final String fileContent = "111|222|333|444|555\n666|777|888|999|000|"; + final FileInputSplit split = createTempGzipFile(fileContent); + + final Configuration parameters = new Configuration(); + + format.setFieldDelimiter("|"); + format.setFieldTypesGeneric(IntValue.class, IntValue.class, IntValue.class, IntValue.class, IntValue.class); + + format.configure(parameters); + format.open(split); + + Value[] values = createIntValues(5); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(111, ((IntValue) values[0]).getValue()); + assertEquals(222, ((IntValue) values[1]).getValue()); + assertEquals(333, ((IntValue) values[2]).getValue()); + assertEquals(444, ((IntValue) values[3]).getValue()); + assertEquals(555, ((IntValue) values[4]).getValue()); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(666, ((IntValue) values[0]).getValue()); + assertEquals(777, ((IntValue) values[1]).getValue()); + assertEquals(888, ((IntValue) values[2]).getValue()); + assertEquals(999, ((IntValue) values[3]).getValue()); + assertEquals(000, ((IntValue) values[4]).getValue()); + + assertNull(format.nextRecord(values)); + assertTrue(format.reachedEnd()); + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + } @Test public void testReadNoPosFirstN() throws IOException { @@ -584,6 +667,28 @@ public class GenericCsvInputFormatTest { return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); } + + private FileInputSplit createTempDeflateFile(String content) throws IOException { + this.tempFile = File.createTempFile("test_contents", "tmp.deflate"); + this.tempFile.deleteOnExit(); + + DataOutputStream dos = new DataOutputStream(new DeflaterOutputStream(new FileOutputStream(tempFile))); + dos.writeBytes(content); + dos.close(); + + return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); + } + + private FileInputSplit createTempGzipFile(String content) throws IOException { + this.tempFile = File.createTempFile("test_contents", "tmp.gz"); + this.tempFile.deleteOnExit(); + + DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(tempFile))); + dos.writeBytes(content); + dos.close(); + + return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); + } private final Value[] createIntValues(int num) { Value[] v = new Value[num];