Repository: flink Updated Branches: refs/heads/release-0.8 02b6f85fe -> 944e2e3d5
[backports] Cleanup and port changes to 0.8 branch. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/944e2e3d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/944e2e3d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/944e2e3d Branch: refs/heads/release-0.8 Commit: 944e2e3d5355e55a9c678158065d9ba81d4190ab Parents: 95958a9 Author: Robert Metzger <[email protected]> Authored: Thu Feb 5 14:07:48 2015 +0100 Committer: Robert Metzger <[email protected]> Committed: Mon Feb 9 14:48:34 2015 +0100 ---------------------------------------------------------------------- .../apache/flink/api/io/avro/AvroPojoTest.java | 178 +++++++++++-------- .../flink/api/java/typeutils/TypeExtractor.java | 4 +- .../java/typeutils/runtime/KryoSerializer.java | 14 +- flink-shaded/pom.xml | 2 +- 4 files changed, 114 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java index 6ff4836..b91a3d8 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java @@ -24,29 +24,33 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.AvroInputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; -import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; @RunWith(Parameterized.class) -public class AvroPojoTest extends MultipleProgramsTestBase { - public AvroPojoTest(ExecutionMode mode) { - super(mode); +public class AvroPojoTest extends JavaProgramTestBase { + + public AvroPojoTest(Configuration config) { + super(config); } private File inFile; - private String resultPath; private String expected; @Rule @@ -59,99 +63,129 @@ public class AvroPojoTest extends MultipleProgramsTestBase { AvroRecordInputFormatTest.writeTestFile(inFile); } - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test - public void testSimpleAvroRead() throws Exception { + private String testField(final String fieldName) throws Exception { + before(); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Path in = new Path(inFile.getAbsoluteFile().toURI()); AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { + DataSet<User> usersDS = env.createInput(users); + + DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; + public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { + for(User u : values) { + out.collect(u.get(fieldName)); + } } }); + res.writeAsText(resultPath); + env.execute("Simple Avro read job"); + if(fieldName.equals("name")) { + return "Alyssa\nCharlie"; + } else if(fieldName.equals("type_enum")) { + return "GREEN\nRED\n"; + } else if(fieldName.equals("type_double_test")) { + return "123.45\n1.337\n"; + } else { + Assert.fail("Unknown field"); + } - usersDS.writeAsText(resultPath); + postSubmit(); + return ""; + } - env.execute("Simple Avro read job"); + private static int NUM_PROGRAMS = 3; + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); } - @Test - public void testKeySelection() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); + @Override + protected void testProgram() throws Exception { + expectedResult = runProgram(curProgId); + } - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for(User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + private String runProgram(int curProgId) throws Exception { + switch(curProgId) { + case 1: + for (String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { + return testField(fieldName); } - } - }); + break; + case 2: + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); - res.writeAsText(resultPath); - env.execute("Avro Key selection"); + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); - expected = "(Alyssa,1)\n(Charlie,1)\n"; - } + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + return "(Alyssa,1)\n(Charlie,1)\n"; + case 3: + env = ExecutionEnvironment.getExecutionEnvironment(); + in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users1 = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS1 = env.createInput(users1) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + value.setTypeMap(null); + return value; + } + }); + + usersDS1.writeAsText(resultPath); + + env.execute("Simple Avro read job"); - /** - * Test some know fields for grouping on - */ - @Test - public void testAllFields() throws Exception { - for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { - testField(fieldName); + + return "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; + + default: + throw new RuntimeException("Unknown test"); } + return ""; } - private void testField(final String fieldName) throws Exception { - before(); + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); + @Parameterized.Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { - @Override - public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { - for(User u : values) { - out.collect(u.get(fieldName)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Simple Avro read job"); - if(fieldName.equals("name")) { - expected = "Alyssa\nCharlie"; - } else if(fieldName.equals("type_enum")) { - expected = "GREEN\nRED\n"; - } else if(fieldName.equals("type_double_test")) { - expected = "123.45\n1.337\n"; - } else { - Assert.fail("Unknown field"); + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); } - after(); + return toParameterList(tConfigs); } } http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 2e90c8a..acf47b6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -939,7 +939,7 @@ public class TypeExtractor { // special case for POJOs generated by Avro. if(SpecificRecordBase.class.isAssignableFrom(clazz)) { - return (TypeInformation<OUT>) new AvroTypeInfo(clazz); + return (TypeInformation<X>) new AvroTypeInfo(clazz); } if (alreadySeen.contains(clazz)) { @@ -1034,7 +1034,7 @@ public class TypeExtractor { } } - private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) { + protected <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) { // try to create Type hierarchy, if the incoming only contains the most bottom one or none. if(typeHierarchy.size() <= 1) { getTypeHierarchy(typeHierarchy, clazz, Object.class); http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index da09242..b73f0b1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -21,7 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; @@ -31,10 +30,14 @@ import com.twitter.chill.ScalaKryoInstantiator; import com.twitter.chill.protobuf.ProtobufSerializer; import com.twitter.chill.thrift.TBaseSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.thrift.TBase; +import scala.reflect.ClassTag; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -243,14 +246,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // register the type of our class kryo.register(type); - - // register given types. we do this first so that any registration of a - // more specific serializer overrides this - for (Class<?> type : registeredTypes) { - kryo.register(type); - } - - + kryo.setRegistrationRequired(false); kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-shaded/pom.xml ---------------------------------------------------------------------- diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml index 51191c7..1f00b62 100644 --- a/flink-shaded/pom.xml +++ b/flink-shaded/pom.xml @@ -25,7 +25,7 @@ under the License. <parent> <groupId>org.apache.flink</groupId> <artifactId>flink-parent</artifactId> - <version>0.9-SNAPSHOT</version> + <version>0.8-SNAPSHOT</version> <relativePath>..</relativePath> </parent>
