Repository: flink Updated Branches: refs/heads/release-0.8 88c7ea256 -> 9f18cbb3a
[FLINK-1567] Add switch to use the AvroSerializer for GenericTypeInfo This closes #413 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f18cbb3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f18cbb3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f18cbb3 Branch: refs/heads/release-0.8 Commit: 9f18cbb3a680e72a21197406059a420857dee5f8 Parents: 88c7ea2 Author: Robert Metzger <rmetz...@apache.org> Authored: Tue Feb 17 12:05:05 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Tue Mar 10 14:02:45 2015 +0100 ---------------------------------------------------------------------- .../apache/flink/api/io/avro/AvroPojoTest.java | 68 +++++++++++++++++++- .../api/java/typeutils/GenericAvroTypeInfo.java | 40 ++++++++++++ .../api/java/typeutils/GenericTypeInfo.java | 11 +++- .../flink/api/java/typeutils/PojoTypeInfo.java | 2 +- .../java/typeutils/runtime/AvroSerializer.java | 25 ++++++- 5 files changed, 140 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java index b91a3d8..b3321c8 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java @@ -22,8 +22,12 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.io.avro.generated.User; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.io.AvroInputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.GenericAvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.test.util.JavaProgramTestBase; @@ -51,7 +55,6 @@ public class AvroPojoTest extends JavaProgramTestBase { } private File inFile; - private String expected; @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -97,7 +100,7 @@ public class AvroPojoTest extends JavaProgramTestBase { return ""; } - private static int NUM_PROGRAMS = 3; + private static int NUM_PROGRAMS = 5; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -146,6 +149,7 @@ public class AvroPojoTest extends JavaProgramTestBase { in = new Path(inFile.getAbsoluteFile().toURI()); AvroInputFormat<User> users1 = new AvroInputFormat<User>(in, User.class); + Assert.assertTrue(users1.getProducedType() instanceof PojoTypeInfo); DataSet<User> usersDS1 = env.createInput(users1) // null map type because the order changes in different JVMs (hard to test) .map(new MapFunction<User, User>() { @@ -163,7 +167,65 @@ public class AvroPojoTest extends JavaProgramTestBase { return "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; + case 4: + /** + * Test GenericTypeInfo with Avro serialization. + */ + env = ExecutionEnvironment.getExecutionEnvironment(); + GenericTypeInfo.USE_AVRO_SERIALIZER = true; + in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users2 = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS2 = env.createInput(users2, new GenericTypeInfo<User>(User.class)); + + DataSet<Tuple2<String, Integer>> res2 = usersDS2.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + res2.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + return "(Alyssa,1)\n(Charlie,1)\n"; + case 5: + /** + * Test GenericAvroTypeInfo with Avro serialization. + */ + env = ExecutionEnvironment.getExecutionEnvironment(); + in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users3 = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS3 = env.createInput(users3, new GenericAvroTypeInfo<User>(User.class)); + + DataSet<Tuple2<String, Integer>> res3 = usersDS3.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res3.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + return "(Alyssa,1)\n(Charlie,1)\n"; default: throw new RuntimeException("Unknown test"); } @@ -176,7 +238,7 @@ public class AvroPojoTest extends JavaProgramTestBase { } @Parameterized.Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + public static Collection<Object[]> getConfigurations() throws IOException { LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java new file mode 100644 index 0000000..6e3eebd --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; + + +/** + * TypeInformation like the regular GenericTypeInfo (kryo) but enforcing + * the AvroSerializer. + */ +public class GenericAvroTypeInfo<T> extends GenericTypeInfo<T> { + + public GenericAvroTypeInfo(Class<T> typeClass) { + super(typeClass); + } + + @Override + public TypeSerializer<T> createSerializer() { + return new AvroSerializer<T>(getTypeClass()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index 5bc6cb9..30688b9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator; import org.apache.flink.api.java.typeutils.runtime.KryoSerializer; @@ -29,9 +30,13 @@ import org.apache.flink.api.java.typeutils.runtime.KryoSerializer; public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> { private final Class<T> typeClass; + public static boolean USE_AVRO_SERIALIZER = false; + + private boolean useAvroSerializer; public GenericTypeInfo(Class<T> typeClass) { this.typeClass = typeClass; + this.useAvroSerializer = USE_AVRO_SERIALIZER; } @Override @@ -66,7 +71,11 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType @Override public TypeSerializer<T> createSerializer() { - return new KryoSerializer<T>(this.typeClass); + if(useAvroSerializer) { + return new AvroSerializer<T>(this.typeClass); + } else { + return new KryoSerializer<T>(this.typeClass); + } } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 0103a7b..30a8df2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -34,9 +34,9 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; import org.apache.flink.api.java.typeutils.runtime.PojoComparator; -import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; import com.google.common.base.Joiner; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; /** http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java index 2758bd6..d5573cb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -18,8 +18,15 @@ package org.apache.flink.api.java.typeutils.runtime; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -91,7 +98,22 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { @Override public T copy(T from) { checkKryoInitialized(); - return this.kryo.copy(from); + try { + return this.kryo.copy(from); + } catch(KryoException ke) { + // kryo was unable to copy it, so we do it through serialization: + ByteArrayOutputStream baout = new ByteArrayOutputStream(); + Output output = new Output(baout); + + kryo.writeObject(output, from); + + output.close(); + + ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); + Input input = new Input(bain); + + return (T)kryo.readObject(input, from.getClass()); + } } @Override @@ -154,6 +176,7 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { private void checkKryoInitialized() { if (this.kryo == null) { this.kryo = new Kryo(); + this.kryo.register(GenericData.Array.class, new KryoSerializer.SpecificInstanceCollectionSerializer(ArrayList.class)); this.kryo.setAsmEnabled(true); this.kryo.register(type); }