Repository: flink
Updated Branches:
  refs/heads/release-0.8 88c7ea256 -> 9f18cbb3a


[FLINK-1567] Add switch to use the AvroSerializer for GenericTypeInfo

This closes #413


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f18cbb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f18cbb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f18cbb3

Branch: refs/heads/release-0.8
Commit: 9f18cbb3a680e72a21197406059a420857dee5f8
Parents: 88c7ea2
Author: Robert Metzger <rmetz...@apache.org>
Authored: Tue Feb 17 12:05:05 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Tue Mar 10 14:02:45 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/api/io/avro/AvroPojoTest.java  | 68 +++++++++++++++++++-
 .../api/java/typeutils/GenericAvroTypeInfo.java | 40 ++++++++++++
 .../api/java/typeutils/GenericTypeInfo.java     | 11 +++-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  2 +-
 .../java/typeutils/runtime/AvroSerializer.java  | 25 ++++++-
 5 files changed, 140 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
index b91a3d8..b3321c8 100644
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -22,8 +22,12 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.AvroInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericAvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -51,7 +55,6 @@ public class AvroPojoTest extends JavaProgramTestBase {
        }
 
        private File inFile;
-       private String expected;
 
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -97,7 +100,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
                return "";
        }
 
-       private static int NUM_PROGRAMS = 3;
+       private static int NUM_PROGRAMS = 5;
 
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -146,6 +149,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
                                in = new Path(inFile.getAbsoluteFile().toURI());
 
                                AvroInputFormat<User> users1 = new 
AvroInputFormat<User>(in, User.class);
+                               Assert.assertTrue(users1.getProducedType() 
instanceof PojoTypeInfo);
                                DataSet<User> usersDS1 = env.createInput(users1)
                                                // null map type because the 
order changes in different JVMs (hard to test)
                                                .map(new MapFunction<User, 
User>() {
@@ -163,7 +167,65 @@ public class AvroPojoTest extends JavaProgramTestBase {
 
                                return "{\"name\": \"Alyssa\", 
\"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, 
\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": 
true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], 
\"type_array_boolean\": [true, false], \"type_nullable_array\": null, 
\"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, 
\"type_union\": null}\n" +
                                                "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null}\n";
+                       case 4:
+                               /**
+                                * Test GenericTypeInfo with Avro serialization.
+                                */
+                               env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               GenericTypeInfo.USE_AVRO_SERIALIZER = true;
+                               in = new Path(inFile.getAbsoluteFile().toURI());
+
+                               AvroInputFormat<User> users2 = new 
AvroInputFormat<User>(in, User.class);
+                               DataSet<User> usersDS2 = 
env.createInput(users2, new GenericTypeInfo<User>(User.class));
+
+                               DataSet<Tuple2<String, Integer>> res2 = 
usersDS2.groupBy(new KeySelector<User, String>() {
+                                       @Override
+                                       public String getKey(User value) throws 
Exception {
+                                               return 
String.valueOf(value.getName());
+                                       }
+                               }).reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
+                                       @Override
+                                       public void reduce(Iterable<User> 
values, Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (User u : values) {
+                                                       out.collect(new 
Tuple2<String, Integer>(u.getName().toString(), 1));
+                                               }
+                                       }
+                               });
 
+                               res2.writeAsText(resultPath);
+                               env.execute("Avro Key selection");
+
+
+                               return "(Alyssa,1)\n(Charlie,1)\n";
+                       case 5:
+                               /**
+                                * Test GenericAvroTypeInfo with Avro 
serialization.
+                                */
+                               env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               in = new Path(inFile.getAbsoluteFile().toURI());
+
+                               AvroInputFormat<User> users3 = new 
AvroInputFormat<User>(in, User.class);
+                               DataSet<User> usersDS3 = 
env.createInput(users3, new GenericAvroTypeInfo<User>(User.class));
+
+                               DataSet<Tuple2<String, Integer>> res3 = 
usersDS3.groupBy(new KeySelector<User, String>() {
+                                       @Override
+                                       public String getKey(User value) throws 
Exception {
+                                               return 
String.valueOf(value.getName());
+                                       }
+                               }).reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
+                                       @Override
+                                       public void reduce(Iterable<User> 
values, Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (User u : values) {
+                                                       out.collect(new 
Tuple2<String, Integer>(u.getName().toString(), 1));
+                                               }
+                                       }
+                               });
+
+                               res3.writeAsText(resultPath);
+                               env.execute("Avro Key selection");
+
+
+                               return "(Alyssa,1)\n(Charlie,1)\n";
                        default:
                                throw new RuntimeException("Unknown test");
                }
@@ -176,7 +238,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
        }
 
        @Parameterized.Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+       public static Collection<Object[]> getConfigurations() throws 
IOException {
 
                LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
new file mode 100644
index 0000000..6e3eebd
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+
+
+/**
+ * TypeInformation like the regular GenericTypeInfo (kryo) but enforcing
+ * the AvroSerializer.
+ */
+public class GenericAvroTypeInfo<T> extends GenericTypeInfo<T> {
+
+       public GenericAvroTypeInfo(Class<T> typeClass) {
+               super(typeClass);
+       }
+
+       @Override
+       public TypeSerializer<T> createSerializer() {
+               return new AvroSerializer<T>(getTypeClass());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 5bc6cb9..30688b9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
 import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 
@@ -29,9 +30,13 @@ import 
org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 public class GenericTypeInfo<T> extends TypeInformation<T> implements 
AtomicType<T> {
 
        private final Class<T> typeClass;
+       public static boolean USE_AVRO_SERIALIZER = false;
+
+       private boolean useAvroSerializer;
 
        public GenericTypeInfo(Class<T> typeClass) {
                this.typeClass = typeClass;
+               this.useAvroSerializer = USE_AVRO_SERIALIZER;
        }
        
        @Override
@@ -66,7 +71,11 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
 
        @Override
        public TypeSerializer<T> createSerializer() {
-               return new KryoSerializer<T>(this.typeClass);
+               if(useAvroSerializer) {
+                       return new AvroSerializer<T>(this.typeClass);
+               } else {
+                       return new KryoSerializer<T>(this.typeClass);
+               }
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 0103a7b..30a8df2 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -34,9 +34,9 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 
 import com.google.common.base.Joiner;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 2758bd6..d5573cb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -18,8 +18,15 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -91,7 +98,22 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        @Override
        public T copy(T from) {
                checkKryoInitialized();
-               return this.kryo.copy(from);
+               try {
+                       return this.kryo.copy(from);
+               } catch(KryoException ke) {
+                       // kryo was unable to copy it, so we do it through 
serialization:
+                       ByteArrayOutputStream baout = new 
ByteArrayOutputStream();
+                       Output output = new Output(baout);
+
+                       kryo.writeObject(output, from);
+
+                       output.close();
+
+                       ByteArrayInputStream bain = new 
ByteArrayInputStream(baout.toByteArray());
+                       Input input = new Input(bain);
+
+                       return (T)kryo.readObject(input, from.getClass());
+               }
        }
        
        @Override
@@ -154,6 +176,7 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        private void checkKryoInitialized() {
                if (this.kryo == null) {
                        this.kryo = new Kryo();
+                       this.kryo.register(GenericData.Array.class, new 
KryoSerializer.SpecificInstanceCollectionSerializer(ArrayList.class));
                        this.kryo.setAsmEnabled(true);
                        this.kryo.register(type);
                }

Reply via email to