Repository: flink
Updated Branches:
  refs/heads/release-0.8 02b6f85fe -> 944e2e3d5


[backports] Cleanup and port changes to 0.8 branch.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/944e2e3d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/944e2e3d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/944e2e3d

Branch: refs/heads/release-0.8
Commit: 944e2e3d5355e55a9c678158065d9ba81d4190ab
Parents: 95958a9
Author: Robert Metzger <[email protected]>
Authored: Thu Feb 5 14:07:48 2015 +0100
Committer: Robert Metzger <[email protected]>
Committed: Mon Feb 9 14:48:34 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/api/io/avro/AvroPojoTest.java  | 178 +++++++++++--------
 .../flink/api/java/typeutils/TypeExtractor.java |   4 +-
 .../java/typeutils/runtime/KryoSerializer.java  |  14 +-
 flink-shaded/pom.xml                            |   2 +-
 4 files changed, 114 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
index 6ff4836..b91a3d8 100644
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -24,29 +24,33 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.AvroInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
-import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
 
 @RunWith(Parameterized.class)
-public class AvroPojoTest extends MultipleProgramsTestBase {
-       public AvroPojoTest(ExecutionMode mode) {
-               super(mode);
+public class AvroPojoTest extends JavaProgramTestBase {
+
+       public AvroPojoTest(Configuration config) {
+               super(config);
        }
 
        private File inFile;
-       private String resultPath;
        private String expected;
 
        @Rule
@@ -59,99 +63,129 @@ public class AvroPojoTest extends MultipleProgramsTestBase 
{
                AvroRecordInputFormatTest.writeTestFile(inFile);
        }
 
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
 
-       @Test
-       public void testSimpleAvroRead() throws Exception {
+       private String testField(final String fieldName) throws Exception {
+               before();
+
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                Path in = new Path(inFile.getAbsoluteFile().toURI());
 
                AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
-                               .map(new MapFunction<User, User>() {
+               DataSet<User> usersDS = env.createInput(users);
+
+               DataSet<Object> res = 
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
                        @Override
-                       public User map(User value) throws Exception {
-                               value.setTypeMap(null);
-                               return value;
+                       public void reduce(Iterable<User> values, 
Collector<Object> out) throws Exception {
+                               for(User u : values) {
+                                       out.collect(u.get(fieldName));
+                               }
                        }
                });
+               res.writeAsText(resultPath);
+               env.execute("Simple Avro read job");
+               if(fieldName.equals("name")) {
+                       return "Alyssa\nCharlie";
+               } else if(fieldName.equals("type_enum")) {
+                       return "GREEN\nRED\n";
+               } else if(fieldName.equals("type_double_test")) {
+                       return "123.45\n1.337\n";
+               } else {
+                       Assert.fail("Unknown field");
+               }
 
-               usersDS.writeAsText(resultPath);
+               postSubmit();
+               return "";
+       }
 
-               env.execute("Simple Avro read job");
+       private static int NUM_PROGRAMS = 3;
 
+       private int curProgId = config.getInteger("ProgramId", -1);
+       private String resultPath;
+       private String expectedResult;
 
-               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" +
-                               "{\"name\": \"Charlie\", \"favorite_number\": 
null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, 
\"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": 
false, \"type_array_string\": [], \"type_array_boolean\": [], 
\"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null}\n";
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
        }
 
-       @Test
-       public void testKeySelection() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = runProgram(curProgId);
+       }
 
-               DataSet<Tuple2<String, Integer>> res = 
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
+       private String runProgram(int curProgId) throws Exception {
+               switch(curProgId) {
+                       case 1:
+                               for (String fieldName : Arrays.asList("name", 
"type_enum", "type_double_test")) {
+                                       return testField(fieldName);
                                }
-                       }
-               });
+                               break;
+                       case 2:
+                               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               Path in = new 
Path(inFile.getAbsoluteFile().toURI());
 
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
+                               AvroInputFormat<User> users = new 
AvroInputFormat<User>(in, User.class);
+                               DataSet<User> usersDS = env.createInput(users);
 
+                               DataSet<Tuple2<String, Integer>> res = 
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
+                                       @Override
+                                       public void reduce(Iterable<User> 
values, Collector<Tuple2<String, Integer>> out) throws Exception {
+                                               for (User u : values) {
+                                                       out.collect(new 
Tuple2<String, Integer>(u.getName().toString(), 1));
+                                               }
+                                       }
+                               });
 
-               expected = "(Alyssa,1)\n(Charlie,1)\n";
-       }
+                               res.writeAsText(resultPath);
+                               env.execute("Avro Key selection");
+
+
+                               return "(Alyssa,1)\n(Charlie,1)\n";
+                       case 3:
+                               env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               in = new Path(inFile.getAbsoluteFile().toURI());
+
+                               AvroInputFormat<User> users1 = new 
AvroInputFormat<User>(in, User.class);
+                               DataSet<User> usersDS1 = env.createInput(users1)
+                                               // null map type because the 
order changes in different JVMs (hard to test)
+                                               .map(new MapFunction<User, 
User>() {
+                                                       @Override
+                                                       public User map(User 
value) throws Exception {
+                                                               
value.setTypeMap(null);
+                                                               return value;
+                                                       }
+                                               });
+
+                               usersDS1.writeAsText(resultPath);
+
+                               env.execute("Simple Avro read job");
 
-       /**
-        * Test some know fields for grouping on
-        */
-       @Test
-       public void testAllFields() throws Exception {
-               for(String fieldName : Arrays.asList("name", "type_enum", 
"type_double_test")) {
-                       testField(fieldName);
+
+                               return "{\"name\": \"Alyssa\", 
\"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, 
\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": 
true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], 
\"type_array_boolean\": [true, false], \"type_nullable_array\": null, 
\"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, 
\"type_union\": null}\n" +
+                                               "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null}\n";
+
+                       default:
+                               throw new RuntimeException("Unknown test");
                }
+               return "";
        }
 
-       private void testField(final String fieldName) throws Exception {
-               before();
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
 
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
+       @Parameterized.Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
 
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 
-               DataSet<Object> res = 
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Object> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(u.get(fieldName));
-                               }
-                       }
-               });
-               res.writeAsText(resultPath);
-               env.execute("Simple Avro read job");
-               if(fieldName.equals("name")) {
-                       expected = "Alyssa\nCharlie";
-               } else if(fieldName.equals("type_enum")) {
-                       expected = "GREEN\nRED\n";
-               } else if(fieldName.equals("type_double_test")) {
-                       expected = "123.45\n1.337\n";
-               } else {
-                       Assert.fail("Unknown field");
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
                }
 
-               after();
+               return toParameterList(tConfigs);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 2e90c8a..acf47b6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -939,7 +939,7 @@ public class TypeExtractor {
 
                // special case for POJOs generated by Avro.
                if(SpecificRecordBase.class.isAssignableFrom(clazz)) {
-                       return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
+                       return (TypeInformation<X>) new AvroTypeInfo(clazz);
                }
 
                if (alreadySeen.contains(clazz)) {
@@ -1034,7 +1034,7 @@ public class TypeExtractor {
                }
        }
 
-       private <X> TypeInformation<X> analyzePojo(Class<X> clazz, 
ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
+       protected <X> TypeInformation<X> analyzePojo(Class<X> clazz, 
ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
                // try to create Type hierarchy, if the incoming only contains 
the most bottom one or none.
                if(typeHierarchy.size() <= 1) {
                        getTypeHierarchy(typeHierarchy, clazz, Object.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index da09242..b73f0b1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.CollectionSerializer;
@@ -31,10 +30,14 @@ import com.twitter.chill.ScalaKryoInstantiator;
 
 import com.twitter.chill.protobuf.ProtobufSerializer;
 import com.twitter.chill.thrift.TBaseSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.thrift.TBase;
+import scala.reflect.ClassTag;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -243,14 +246,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
                        // register the type of our class
                        kryo.register(type);
-                       
-                       // register given types. we do this first so that any 
registration of a
-                       // more specific serializer overrides this
-                       for (Class<?> type : registeredTypes) {
-                               kryo.register(type);
-                       }
-                       
-                       
+
                        kryo.setRegistrationRequired(false);
                        
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/944e2e3d/flink-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml
index 51191c7..1f00b62 100644
--- a/flink-shaded/pom.xml
+++ b/flink-shaded/pom.xml
@@ -25,7 +25,7 @@ under the License.
        <parent>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-parent</artifactId>
-               <version>0.9-SNAPSHOT</version>
+               <version>0.8-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
 

Reply via email to