[FLINK-4771] [avro] Add support for compression to AvroOutputFormat.

This closes #2612


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dffde7ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dffde7ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dffde7ef

Branch: refs/heads/master
Commit: dffde7efb3c60da83e54f4202004d5d70e174e8f
Parents: dc99deb
Author: larsbachmann <lars.bachm...@posteo.de>
Authored: Sat Oct 8 13:30:14 2016 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Sat Oct 15 07:59:51 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     |  66 ++++++++
 .../flink/api/avro/AvroOutputFormatITCase.java  |   1 +
 .../flink/api/java/io/AvroOutputFormatTest.java | 154 +++++++++++++++++++
 3 files changed, 221 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 4d8313c..600d1e5 100644
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
@@ -26,16 +27,58 @@ import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.core.fs.Path;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import java.io.IOException;
 import java.io.Serializable;
 
 public class AvroOutputFormat<E> extends FileOutputFormat<E> implements 
Serializable {
 
+       /**
+        * Wrapper which encapsulates the supported codec and a related 
serialization byte.
+        */
+       public enum Codec {
+
+               NULL((byte)0, CodecFactory.nullCodec()),
+               SNAPPY((byte)1, CodecFactory.snappyCodec()),
+               BZIP2((byte)2, CodecFactory.bzip2Codec()),
+               DEFLATE((byte)3, 
CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+               XZ((byte)4, 
CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+               private byte codecByte;
+
+               private CodecFactory codecFactory;
+
+               Codec(final byte codecByte, final CodecFactory codecFactory) {
+                       this.codecByte = codecByte;
+                       this.codecFactory = codecFactory;
+               }
+
+               private byte getCodecByte() {
+                       return codecByte;
+               }
+
+               private CodecFactory getCodecFactory() {
+                       return codecFactory;
+               }
+
+               private static Codec forCodecByte(byte codecByte) {
+                       for (final Codec codec : Codec.values()) {
+                               if (codec.getCodecByte() == codecByte) {
+                                       return codec;
+                               }
+                       }
+                       throw new IllegalArgumentException("no codec for 
codecByte: " + codecByte);
+               }
+       }
+
        private static final long serialVersionUID = 1L;
 
        private final Class<E> avroValueType;
 
        private transient Schema userDefinedSchema = null;
+
+       private transient Codec codec = null;
        
        private transient DataFileWriter<E> dataFileWriter;
 
@@ -57,6 +100,15 @@ public class AvroOutputFormat<E> extends 
FileOutputFormat<E> implements Serializ
                this.userDefinedSchema = schema;
        }
 
+       /**
+        * Set avro codec for compression.
+        *
+        * @param codec avro codec.
+        */
+       public void setCodec(final Codec codec) {
+               this.codec = checkNotNull(codec, "codec can not be null");
+       }
+
        @Override
        public void writeRecord(E record) throws IOException {
                dataFileWriter.append(record);
@@ -82,6 +134,9 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> 
implements Serializ
                        schema = ReflectData.get().getSchema(avroValueType);
                }
                dataFileWriter = new DataFileWriter<E>(datumWriter);
+               if (codec != null) {
+                       dataFileWriter.setCodec(codec.getCodecFactory());
+               }
                if (userDefinedSchema == null) {
                        dataFileWriter.create(schema, stream);
                } else {
@@ -92,6 +147,12 @@ public class AvroOutputFormat<E> extends 
FileOutputFormat<E> implements Serializ
        private void writeObject(java.io.ObjectOutputStream out) throws 
IOException {
                out.defaultWriteObject();
 
+               if (codec != null) {
+                       out.writeByte(codec.getCodecByte());
+               } else {
+                       out.writeByte(-1);
+               }
+
                if(userDefinedSchema != null) {
                        byte[] json = userDefinedSchema.toString().getBytes();
                        out.writeInt(json.length);
@@ -104,6 +165,11 @@ public class AvroOutputFormat<E> extends 
FileOutputFormat<E> implements Serializ
        private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
                in.defaultReadObject();
 
+               byte codecByte = in.readByte();
+               if (codecByte >= 0) {
+                       setCodec(Codec.forCodecByte(codecByte));
+               }
+
                int length = in.readInt();
                if(length != 0) {
                        byte[] json = new byte[length];

http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
index adbe5dd..3b01ccb 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -69,6 +69,7 @@ public class AvroOutputFormatITCase extends 
JavaProgramTestBase {
                //output the data with AvroOutputFormat for specific user type
                DataSet<User> specificUser = input.map(new ConvertToUser());
                AvroOutputFormat<User> avroOutputFormat = new 
AvroOutputFormat<User>(User.class);
+               avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // 
FLINK-4771: use a codec
                avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure 
the OF is properly serializing the schema
                specificUser.write(avroOutputFormat, outputPath1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
new file mode 100644
index 0000000..4d6c6b7
--- /dev/null
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+/**
+ * Tests for {@link AvroOutputFormat}
+ */
+public class AvroOutputFormatTest {
+
+       @Test
+       public void testSetCodec() throws Exception {
+               // given
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+
+               // when
+               try {
+                       outputFormat.setCodec(Codec.SNAPPY);
+               } catch (Exception ex) {
+                       // then
+                       fail("unexpected exception");
+               }
+       }
+
+       @Test
+       public void testSetCodecError() throws Exception {
+               // given
+               boolean error = false;
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+
+               // when
+               try {
+                       outputFormat.setCodec(null);
+               } catch (Exception ex) {
+                       error = true;
+               }
+
+               // then
+               assertTrue(error);
+       }
+
+       @Test
+       public void testSerialization() throws Exception {
+
+               serializeAndDeserialize(null, null);
+               serializeAndDeserialize(null, User.SCHEMA$);
+               for (final Codec codec : Codec.values()) {
+                       serializeAndDeserialize(codec, null);
+                       serializeAndDeserialize(codec, User.SCHEMA$);
+               }
+       }
+
+       private void serializeAndDeserialize(final Codec codec, final Schema 
schema) throws IOException, ClassNotFoundException {
+               // given
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
+               if (codec != null) {
+                       outputFormat.setCodec(codec);
+               }
+               if (schema != null) {
+                       outputFormat.setSchema(schema);
+               }
+
+               final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+               // when
+               try (final ObjectOutputStream oos = new 
ObjectOutputStream(bos)) {
+                       oos.writeObject(outputFormat);
+               }
+               try (final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
+                       // then
+                       Object o = ois.readObject();
+                       assertTrue(o instanceof AvroOutputFormat);
+                       final AvroOutputFormat<User> restored = 
(AvroOutputFormat<User>) o;
+                       final Codec restoredCodec = (Codec) 
Whitebox.getInternalState(restored, "codec");
+                       final Schema restoredSchema = (Schema) 
Whitebox.getInternalState(restored, "userDefinedSchema");
+
+                       assertTrue(codec != null ? restoredCodec == codec : 
restoredCodec == null);
+                       assertTrue(schema != null ? 
restoredSchema.equals(schema) : restoredSchema == null);
+               }
+       }
+
+       @Test
+       public void testCompression() throws Exception {
+               // given
+               final Path outputPath = new 
Path(File.createTempFile("avro-output-file","avro").getAbsolutePath());
+               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(outputPath,User.class);
+               outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+               final Path compressedOutputPath = new 
Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath());
+               final AvroOutputFormat<User> compressedOutputFormat = new 
AvroOutputFormat<>(compressedOutputPath,User.class);
+               
compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+               compressedOutputFormat.setCodec(Codec.SNAPPY);
+
+               // when
+               output(outputFormat);
+               output(compressedOutputFormat);
+
+               // then
+               assertTrue(fileSize(outputPath) > 
fileSize(compressedOutputPath));
+
+               // cleanup
+               Files.delete(Paths.get(outputPath.getPath()));
+               Files.delete(Paths.get(compressedOutputPath.getPath()));
+       }
+
+       private long fileSize(Path path) throws IOException {
+               return Files.size(Paths.get(path.getPath()));
+       }
+
+       private void output(final AvroOutputFormat<User> outputFormat) throws 
IOException {
+               outputFormat.configure(new Configuration());
+               outputFormat.open(1,1);
+               for (int i = 0; i < 100; i++) {
+                       outputFormat.writeRecord(new User("testUser",1,"blue"));
+               }
+               outputFormat.close();
+       }
+}

Reply via email to