[FLINK-4771] [avro] Add support for compression to AvroOutputFormat. This closes #2612
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dffde7ef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dffde7ef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dffde7ef Branch: refs/heads/master Commit: dffde7efb3c60da83e54f4202004d5d70e174e8f Parents: dc99deb Author: larsbachmann <lars.bachm...@posteo.de> Authored: Sat Oct 8 13:30:14 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat Oct 15 07:59:51 2016 +0200 ---------------------------------------------------------------------- .../flink/api/java/io/AvroOutputFormat.java | 66 ++++++++ .../flink/api/avro/AvroOutputFormatITCase.java | 1 + .../flink/api/java/io/AvroOutputFormatTest.java | 154 +++++++++++++++++++ 3 files changed, 221 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java index 4d8313c..600d1e5 100644 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java +++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.io; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; @@ -26,16 +27,58 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.core.fs.Path; +import static org.apache.flink.util.Preconditions.checkNotNull; + import java.io.IOException; import java.io.Serializable; public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable { + /** + * Wrapper which encapsulates the supported codec and a related serialization byte. + */ + public enum Codec { + + NULL((byte)0, CodecFactory.nullCodec()), + SNAPPY((byte)1, CodecFactory.snappyCodec()), + BZIP2((byte)2, CodecFactory.bzip2Codec()), + DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), + XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); + + private byte codecByte; + + private CodecFactory codecFactory; + + Codec(final byte codecByte, final CodecFactory codecFactory) { + this.codecByte = codecByte; + this.codecFactory = codecFactory; + } + + private byte getCodecByte() { + return codecByte; + } + + private CodecFactory getCodecFactory() { + return codecFactory; + } + + private static Codec forCodecByte(byte codecByte) { + for (final Codec codec : Codec.values()) { + if (codec.getCodecByte() == codecByte) { + return codec; + } + } + throw new IllegalArgumentException("no codec for codecByte: " + codecByte); + } + } + private static final long serialVersionUID = 1L; private final Class<E> avroValueType; private transient Schema userDefinedSchema = null; + + private transient Codec codec = null; private transient DataFileWriter<E> dataFileWriter; @@ -57,6 +100,15 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ this.userDefinedSchema = schema; } + /** + * Set avro codec for compression. + * + * @param codec avro codec. + */ + public void setCodec(final Codec codec) { + this.codec = checkNotNull(codec, "codec can not be null"); + } + @Override public void writeRecord(E record) throws IOException { dataFileWriter.append(record); @@ -82,6 +134,9 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ schema = ReflectData.get().getSchema(avroValueType); } dataFileWriter = new DataFileWriter<E>(datumWriter); + if (codec != null) { + dataFileWriter.setCodec(codec.getCodecFactory()); + } if (userDefinedSchema == null) { dataFileWriter.create(schema, stream); } else { @@ -92,6 +147,12 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ private void writeObject(java.io.ObjectOutputStream out) throws IOException { out.defaultWriteObject(); + if (codec != null) { + out.writeByte(codec.getCodecByte()); + } else { + out.writeByte(-1); + } + if(userDefinedSchema != null) { byte[] json = userDefinedSchema.toString().getBytes(); out.writeInt(json.length); @@ -104,6 +165,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); + byte codecByte = in.readByte(); + if (codecByte >= 0) { + setCodec(Codec.forCodecByte(codecByte)); + } + int length = in.readInt(); if(length != 0) { byte[] json = new byte[length]; http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java index adbe5dd..3b01ccb 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java @@ -69,6 +69,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase { //output the data with AvroOutputFormat for specific user type DataSet<User> specificUser = input.map(new ConvertToUser()); AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class); + avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema specificUser.write(avroOutputFormat, outputPath1); http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java new file mode 100644 index 0000000..4d6c6b7 --- /dev/null +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.apache.flink.api.java.io.AvroOutputFormat.Codec; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.avro.Schema; +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +/** + * Tests for {@link AvroOutputFormat} + */ +public class AvroOutputFormatTest { + + @Test + public void testSetCodec() throws Exception { + // given + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + + // when + try { + outputFormat.setCodec(Codec.SNAPPY); + } catch (Exception ex) { + // then + fail("unexpected exception"); + } + } + + @Test + public void testSetCodecError() throws Exception { + // given + boolean error = false; + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + + // when + try { + outputFormat.setCodec(null); + } catch (Exception ex) { + error = true; + } + + // then + assertTrue(error); + } + + @Test + public void testSerialization() throws Exception { + + serializeAndDeserialize(null, null); + serializeAndDeserialize(null, User.SCHEMA$); + for (final Codec codec : Codec.values()) { + serializeAndDeserialize(codec, null); + serializeAndDeserialize(codec, User.SCHEMA$); + } + } + + private void serializeAndDeserialize(final Codec codec, final Schema schema) throws IOException, ClassNotFoundException { + // given + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class); + if (codec != null) { + outputFormat.setCodec(codec); + } + if (schema != null) { + outputFormat.setSchema(schema); + } + + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + // when + try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(outputFormat); + } + try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) { + // then + Object o = ois.readObject(); + assertTrue(o instanceof AvroOutputFormat); + final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o; + final Codec restoredCodec = (Codec) Whitebox.getInternalState(restored, "codec"); + final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema"); + + assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null); + assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null); + } + } + + @Test + public void testCompression() throws Exception { + // given + final Path outputPath = new Path(File.createTempFile("avro-output-file","avro").getAbsolutePath()); + final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath,User.class); + outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + + final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath()); + final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath,User.class); + compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + compressedOutputFormat.setCodec(Codec.SNAPPY); + + // when + output(outputFormat); + output(compressedOutputFormat); + + // then + assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath)); + + // cleanup + Files.delete(Paths.get(outputPath.getPath())); + Files.delete(Paths.get(compressedOutputPath.getPath())); + } + + private long fileSize(Path path) throws IOException { + return Files.size(Paths.get(path.getPath())); + } + + private void output(final AvroOutputFormat<User> outputFormat) throws IOException { + outputFormat.configure(new Configuration()); + outputFormat.open(1,1); + for (int i = 0; i < 100; i++) { + outputFormat.writeRecord(new User("testUser",1,"blue")); + } + outputFormat.close(); + } +}