Repository: flink
Updated Branches:
  refs/heads/master ef58cf302 -> 5e2fb3cb5


[FLINK-3306] [core] Fix auto-type registry util


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4bc47af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4bc47af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4bc47af

Branch: refs/heads/master
Commit: c4bc47afa6147dd25d8b579e764b30c9c13ee8ea
Parents: d902d16
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 29 17:08:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 1 14:45:55 2016 +0100

----------------------------------------------------------------------
 .../api/io/avro/AvroRecordInputFormatTest.java  | 115 ++++++++---------
 .../flink/api/java/ExecutionEnvironment.java    |  51 +++-----
 .../flink/api/java/typeutils/TypeExtractor.java |   5 +
 .../typeutils/runtime/kryo/Serializers.java     | 128 +++++++++++--------
 .../kryo/KryoGenericTypeSerializerTest.java     |  31 ++---
 .../typeutils/runtime/kryo/SerializersTest.java |  61 +++++++--
 6 files changed, 217 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index c04435c..42cbebe 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -22,10 +22,7 @@ import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
@@ -219,41 +216,43 @@ public class AvroRecordInputFormatTest {
         */
        @Test
        public void testDeserializeToGenericType() throws IOException {
-               DatumReader<GenericData.Record> datumReader = new 
GenericDatumReader<GenericData.Record>(userSchema);
-
-               FileReader<GenericData.Record> dataFileReader = 
DataFileReader.openReader(testFile, datumReader);
-               // initialize Record by reading it from disk (thats easier than 
creating it by hand)
-               GenericData.Record rec = new GenericData.Record(userSchema);
-               dataFileReader.next(rec);
-               // check if record has been read correctly
-               assertNotNull(rec);
-               assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString() );
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
-               assertEquals(null, rec.get("type_long_test")); // it is null 
for the first record.
-
-               // now serialize it with our framework:
-
-               TypeInformation<GenericData.Record> te = 
(TypeInformation<GenericData.Record>) 
TypeExtractor.createTypeInfo(GenericData.Record.class);
-               ExecutionConfig ec = new ExecutionConfig();
-               Assert.assertEquals(GenericTypeInfo.class, te.getClass());
-               Serializers.recursivelyRegisterType(( (GenericTypeInfo) 
te).getTypeClass(), ec);
-
-               TypeSerializer<GenericData.Record> tser = 
te.createSerializer(ec);
-               Assert.assertEquals(1, 
ec.getDefaultKryoSerializerClasses().size());
-               Assert.assertTrue(
-                       
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
-                       
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
-               ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
-               tser.serialize(rec, target);
-
-               GenericData.Record newRec = 
tser.deserialize(target.getInputView());
-
-               // check if it is still the same
-               assertNotNull(newRec);
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.get("type_enum").toString());
-               assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString() );
-               assertEquals(null, newRec.get("type_long_test"));
-
+               DatumReader<GenericData.Record> datumReader = new 
GenericDatumReader<>(userSchema);
+
+               try (FileReader<GenericData.Record> dataFileReader = 
DataFileReader.openReader(testFile, datumReader)) {
+                       // initialize Record by reading it from disk (thats 
easier than creating it by hand)
+                       GenericData.Record rec = new 
GenericData.Record(userSchema);
+                       dataFileReader.next(rec);
+                       
+                       // check if record has been read correctly
+                       assertNotNull(rec);
+                       assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+                       assertEquals(null, rec.get("type_long_test")); // it is 
null for the first record.
+
+                       // now serialize it with our framework:
+                       TypeInformation<GenericData.Record> te = 
TypeExtractor.createTypeInfo(GenericData.Record.class);
+
+                       ExecutionConfig ec = new ExecutionConfig();
+                       Assert.assertEquals(GenericTypeInfo.class, 
te.getClass());
+                       
+                       Serializers.recursivelyRegisterType(te.getTypeClass(), 
ec, new HashSet<Class<?>>());
+
+                       TypeSerializer<GenericData.Record> tser = 
te.createSerializer(ec);
+                       Assert.assertEquals(1, 
ec.getDefaultKryoSerializerClasses().size());
+                       Assert.assertTrue(
+                                       
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
+                                                       
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
+                       ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
+                       tser.serialize(rec, target);
+
+                       GenericData.Record newRec = 
tser.deserialize(target.getInputView());
+
+                       // check if it is still the same
+                       assertNotNull(newRec);
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
+                       assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString());
+                       assertEquals(null, newRec.get("type_long_test"));
+               }
        }
 
        /**
@@ -264,28 +263,30 @@ public class AvroRecordInputFormatTest {
 
                DatumReader<User> datumReader = new 
SpecificDatumReader<User>(userSchema);
 
-               FileReader<User> dataFileReader = 
DataFileReader.openReader(testFile, datumReader);
-               User rec = dataFileReader.next();
+               try (FileReader<User> dataFileReader = 
DataFileReader.openReader(testFile, datumReader)) {
+                       User rec = dataFileReader.next();
+
+                       // check if record has been read correctly
+                       assertNotNull(rec);
+                       assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
 
-               // check if record has been read correctly
-               assertNotNull(rec);
-               assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString() );
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
+                       // now serialize it with our framework:
+                       ExecutionConfig ec = new ExecutionConfig();
+                       TypeInformation<User> te = 
TypeExtractor.createTypeInfo(User.class);
 
-               // now serialize it with our framework:
-               ExecutionConfig ec = new ExecutionConfig();
-               TypeInformation<User> te = (TypeInformation<User>) 
TypeExtractor.createTypeInfo(User.class);
-               Assert.assertEquals(AvroTypeInfo.class, te.getClass());
-               TypeSerializer<User> tser = te.createSerializer(ec);
-               ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
-               tser.serialize(rec, target);
+                       Assert.assertEquals(AvroTypeInfo.class, te.getClass());
+                       TypeSerializer<User> tser = te.createSerializer(ec);
+                       ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
+                       tser.serialize(rec, target);
 
-               User newRec = tser.deserialize(target.getInputView());
+                       User newRec = tser.deserialize(target.getInputView());
 
-               // check if it is still the same
-               assertNotNull(newRec);
-               assertEquals("name not equal", TEST_NAME, 
newRec.getName().toString() );
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.getTypeEnum().toString() );
+                       // check if it is still the same
+                       assertNotNull(newRec);
+                       assertEquals("name not equal", TEST_NAME, 
newRec.getName().toString());
+                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
+               }
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0dd754a..10cb5e3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -20,12 +20,7 @@ package org.apache.flink.api.java;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -38,7 +33,6 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
@@ -52,11 +46,7 @@ import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.operators.OperatorTranslation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.typeutils.*;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -942,30 +932,23 @@ public abstract class ExecutionEnvironment {
                        plan.setDefaultParallelism(getParallelism());
                }
                plan.setExecutionConfig(getConfig());
+               
                // Check plan for GenericTypeInfo's and register the types at 
the serializers.
-               plan.accept(new 
Visitor<org.apache.flink.api.common.operators.Operator<?>>() {
-                       @Override
-                       public boolean 
preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
-                               OperatorInformation<?> opInfo = 
visitable.getOperatorInfo();
-                               TypeInformation<?> typeInfo = 
opInfo.getOutputType();
-                               if(typeInfo instanceof GenericTypeInfo) {
-                                       GenericTypeInfo<?> genericTypeInfo = 
(GenericTypeInfo<?>) typeInfo;
-                                       
if(!config.isAutoTypeRegistrationDisabled()) {
-                                               
Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config);
-                                       }
-                               }
-                               if(typeInfo instanceof CompositeType) {
-                                       List<GenericTypeInfo<?>> 
genericTypesInComposite = new ArrayList<>();
-                                       
Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, 
genericTypesInComposite);
-                                       for(GenericTypeInfo<?> gt : 
genericTypesInComposite) {
-                                               
Serializers.recursivelyRegisterType(gt.getTypeClass(), config);
-                                       }
+               if (!config.isAutoTypeRegistrationDisabled()) {
+                       plan.accept(new 
Visitor<org.apache.flink.api.common.operators.Operator<?>>() {
+                               
+                               private final HashSet<Class<?>> deduplicator = 
new HashSet<>();
+                               
+                               @Override
+                               public boolean 
preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
+                                       OperatorInformation<?> opInfo = 
visitable.getOperatorInfo();
+                                       
Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, 
deduplicator);
+                                       return true;
                                }
-                               return true;
-                       }
-                       @Override
-                       public void 
postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
-               });
+                               @Override
+                               public void 
postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
+                       });
+               }
 
                try {
                        registerCachedFilesWithPlan(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index ddb4a48..b8f2075 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -332,6 +332,11 @@ public class TypeExtractor {
        // 
--------------------------------------------------------------------------------------------
        //  Create type information
        // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("unchecked")
+       public static <T> TypeInformation<T> createTypeInfo(Class<T> type) {
+               return (TypeInformation<T>) createTypeInfo((Type) type);
+       }
        
        public static TypeInformation<?> createTypeInfo(Type t) {
                TypeInformation<?> ti = new 
TypeExtractor().privateCreateTypeInfo(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 0ea8691..6903d35 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -23,24 +23,28 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+
 import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;
 import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecordBase;
+
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
+import java.lang.reflect.*;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 
@@ -54,49 +58,78 @@ import java.util.Set;
  * Also, there is a Java Annotation for adding a default serializer 
(@DefaultSerializer) to classes.
  */
 public class Serializers {
-       /**
-        * NOTE: This method is not a public Flink API.
-        *
-        * This method walks the entire hierarchy of the given type and 
registers all types it encounters
-        * to Kryo.
-        * It also watches for types which need special serializers.
-        */
-       private static Set<Class<?>> alreadySeen = new HashSet<>();
 
-       public static void recursivelyRegisterType(Class<?> type, 
ExecutionConfig config) {
-               alreadySeen.add(type);
+       public static void recursivelyRegisterType(TypeInformation<?> typeInfo, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
+               if (typeInfo instanceof GenericTypeInfo) {
+                       GenericTypeInfo<?> genericTypeInfo = 
(GenericTypeInfo<?>) typeInfo;
+                       
Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config, 
alreadySeen);
+               }
+               else if (typeInfo instanceof CompositeType) {
+                       List<GenericTypeInfo<?>> genericTypesInComposite = new 
ArrayList<>();
+                       
Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, 
genericTypesInComposite);
+                       for (GenericTypeInfo<?> gt : genericTypesInComposite) {
+                               
Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen);
+                       }
+               }
+               else if (typeInfo instanceof ObjectArrayTypeInfo) {
+                       ObjectArrayTypeInfo<?, ?> objectArrayTypeInfo = 
(ObjectArrayTypeInfo<?, ?>) typeInfo;
+                       
recursivelyRegisterType(objectArrayTypeInfo.getComponentInfo(), config, 
alreadySeen);
+               }
+       }
+       
+       public static void recursivelyRegisterType(Class<?> type, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
+               // don't register or remember primitives
+               if (type == null || type.isPrimitive() || type == Object.class) 
{
+                       return;
+               }
                
-               if (type.isPrimitive()) {
+               // prevent infinite recursion for recursive types
+               if (!alreadySeen.add(type)) {
                        return;
                }
-               config.registerKryoType(type);
-               addSerializerForType(config, type);
-
-               Field[] fields = type.getDeclaredFields();
-               for (Field field : fields) {
-                       if(Modifier.isStatic(field.getModifiers()) || 
Modifier.isTransient(field.getModifiers())) {
-                               continue;
-                       }
-                       Type fieldType = field.getGenericType();
-                       if (fieldType instanceof ParameterizedType) { // field 
has generics
-                               ParameterizedType parameterizedFieldType = 
(ParameterizedType) fieldType;
-                               for (Type t: 
parameterizedFieldType.getActualTypeArguments()) {
-                                       if (TypeExtractor.isClassType(t) ) {
-                                               Class<?> clazz = 
TypeExtractor.typeToClass(t);
-                                               if 
(!alreadySeen.contains(clazz)) {
-                                                       
recursivelyRegisterType(TypeExtractor.typeToClass(t), config);
-                                               }
-                                       }
+               
+               if (type.isArray()) {
+                       recursivelyRegisterType(type.getComponentType(), 
config, alreadySeen);
+               }
+               else {
+                       config.registerKryoType(type);
+                       checkAndAddSerializerForTypeAvro(config, type);
+       
+                       Field[] fields = type.getDeclaredFields();
+                       for (Field field : fields) {
+                               if (Modifier.isStatic(field.getModifiers()) || 
Modifier.isTransient(field.getModifiers())) {
+                                       continue;
                                }
+                               Type fieldType = field.getGenericType();
+                               recursivelyRegisterGenericType(fieldType, 
config, alreadySeen);
                        }
-                       Class<?> clazz = field.getType();
-                       if (!alreadySeen.contains(clazz)) {
-                               recursivelyRegisterType(clazz, config);
+               }
+       }
+       
+       private static void recursivelyRegisterGenericType(Type fieldType, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
+               if (fieldType instanceof ParameterizedType) {
+                       // field has generics
+                       ParameterizedType parameterizedFieldType = 
(ParameterizedType) fieldType;
+                       
+                       for (Type t: 
parameterizedFieldType.getActualTypeArguments()) {
+                               if (TypeExtractor.isClassType(t) ) {
+                                       
recursivelyRegisterType(TypeExtractor.typeToClass(t), config, alreadySeen);
+                               }
                        }
+
+                       
recursivelyRegisterGenericType(parameterizedFieldType.getRawType(), config, 
alreadySeen);
+               }
+               else if (fieldType instanceof GenericArrayType) {
+                       GenericArrayType genericArrayType = (GenericArrayType) 
fieldType;
+                       
recursivelyRegisterGenericType(genericArrayType.getGenericComponentType(), 
config, alreadySeen);
+               }
+               else if (fieldType instanceof Class) {
+                       Class<?> clazz = (Class<?>) fieldType;
+                       recursivelyRegisterType(clazz, config, alreadySeen);
                }
        }
 
-       public static void addSerializerForType(ExecutionConfig reg, Class<?> 
type) {
+       private static void checkAndAddSerializerForTypeAvro(ExecutionConfig 
reg, Class<?> type) {
                if (GenericData.Record.class.isAssignableFrom(type)) {
                        registerGenericAvro(reg);
                }
@@ -105,16 +138,13 @@ public class Serializers {
                        Class<? extends SpecificRecordBase> specRecordClass = 
(Class<? extends SpecificRecordBase>) type;
                        registerSpecificAvro(reg, specRecordClass);
                }
-               if (DateTime.class.isAssignableFrom(type) || 
Interval.class.isAssignableFrom(type)) {
-                       registerJodaTime(reg);
-               }
        }
 
        /**
         * Register these serializers for using Avro's {@link 
GenericData.Record} and classes
         * implementing {@link org.apache.avro.specific.SpecificRecordBase}
         */
-       public static void registerGenericAvro(ExecutionConfig reg) {
+       private static void registerGenericAvro(ExecutionConfig reg) {
                // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
                // because Kryo is not able to serialize them properly, we use 
this serializer for them
                reg.registerTypeWithKryoSerializer(GenericData.Array.class, 
SpecificInstanceCollectionSerializerForArrayList.class);
@@ -126,8 +156,7 @@ public class Serializers {
                reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
        }
 
-
-       public static void registerSpecificAvro(ExecutionConfig reg, Class<? 
extends SpecificRecordBase> avroType) {
+       private static void registerSpecificAvro(ExecutionConfig reg, Class<? 
extends SpecificRecordBase> avroType) {
                registerGenericAvro(reg);
                // This rule only applies if users explicitly use the 
GenericTypeInformation for the avro types
                // usually, we are able to handle Avro POJOs with the POJO 
serializer.
@@ -136,14 +165,13 @@ public class Serializers {
        //      ClassTag<SpecificRecordBase> tag = 
scala.reflect.ClassTag$.MODULE$.apply(avroType);
        //      reg.registerTypeWithKryoSerializer(avroType, 
com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
        }
-
-
+       
        /**
         * Currently, the following classes of JodaTime are supported:
-        *      - DateTime
-        *      - Interval
+        *      - DateTime
+        *      - Interval
         *
-        *      The following chronologies are supported: (see {@link 
de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer})
+        *      The following chronologies are supported: (see {@link 
de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer})
         * <ul>
         * <li>{@link org.joda.time.chrono.ISOChronology}</li>
         * <li>{@link org.joda.time.chrono.CopticChronology}</li>
@@ -159,7 +187,7 @@ public class Serializers {
                reg.registerTypeWithKryoSerializer(DateTime.class, 
JodaDateTimeSerializer.class);
                reg.registerTypeWithKryoSerializer(Interval.class, 
JodaIntervalSerializer.class);
        }
-
+       
        /**
         * Register less frequently used serializers
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
index 4b6e432..8ff0b1b 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java
@@ -18,23 +18,24 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import static org.junit.Assert.*;
-
 import com.esotericsoftware.kryo.Kryo;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
 import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
-import org.joda.time.DateTime;
-import org.junit.Test;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
+import org.junit.Test;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Random;
 
+import static org.junit.Assert.*;
+
 @SuppressWarnings("unchecked")
 public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
 
@@ -42,7 +43,7 @@ public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
        
        @Test
        public void testJavaList(){
-               Collection<Integer> a = new ArrayList<Integer>();
+               Collection<Integer> a = new ArrayList<>();
 
                fillCollection(a);
 
@@ -51,7 +52,7 @@ public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
 
        @Test
        public void testJavaSet(){
-               Collection<Integer> b = new HashSet<Integer>();
+               Collection<Integer> b = new HashSet<>();
 
                fillCollection(b);
 
@@ -62,24 +63,12 @@ public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
 
        @Test
        public void testJavaDequeue(){
-               Collection<Integer> c = new LinkedList<Integer>();
-
+               Collection<Integer> c = new LinkedList<>();
                fillCollection(c);
-
                runTests(c);
        }
 
-       @Test
-       public void testJodaTime(){
-               Collection<DateTime> b = new HashSet<DateTime>();
-               Serializers.registerJodaTime(ec);
-               b.add(new DateTime(1));
-               b.add(new DateTime(2));
-
-               runTests(b);
-       }
-
-       private void fillCollection(Collection<Integer> coll){
+       private void fillCollection(Collection<Integer> coll) {
                coll.add(42);
                coll.add(1337);
                coll.add(49);
@@ -140,7 +129,7 @@ public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
                        int numElements = 100;
                        // construct a memory target that is too small for the 
string
                        TestDataOutputSerializer target = new 
TestDataOutputSerializer(5*numElements, 5*numElements);
-                       KryoSerializer<Integer> serializer = new 
KryoSerializer<Integer>(Integer.class, new ExecutionConfig());
+                       KryoSerializer<Integer> serializer = new 
KryoSerializer<>(Integer.class, new ExecutionConfig());
 
                        for(int i = 0; i < numElements; i++){
                                serializer.serialize(i, target);

http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
index 96d761a..7c6d023 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java
@@ -15,15 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.joda.time.Interval;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.fs.Path;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertTrue;
 
 public class SerializersTest {
 
@@ -31,36 +36,68 @@ public class SerializersTest {
        public static class Node {
                private Node parent;
        }
+       
        public static class FromNested {
                Node recurseMe;
        }
-       public static class FromGeneric {
-
-       }
+       
+       public static class FromGeneric1 {}
+       public static class FromGeneric2 {}
+       
        public static class Nested1 {
                private FromNested fromNested;
-               private Interval yodaIntervall;
+               private Path yodaIntervall;
        }
 
        public static class ClassWithNested {
+               
                Nested1 nested;
                int ab;
-               ArrayList<FromGeneric> addGenType;
+               
+               ArrayList<FromGeneric1> addGenType;
+               FromGeneric2[] genericArrayType;
        }
 
        @Test
        public void testTypeRegistration() {
                ExecutionConfig conf = new ExecutionConfig();
-               Serializers.recursivelyRegisterType(ClassWithNested.class, 
conf);
-               KryoSerializer kryo = new KryoSerializer(String.class, conf); 
// we create Kryo from another type.
+               Serializers.recursivelyRegisterType(ClassWithNested.class, 
conf, new HashSet<Class<?>>());
+               
+               KryoSerializer<String> kryo = new 
KryoSerializer<>(String.class, conf); // we create Kryo from another type.
 
                
Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
                
Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() 
> 0);
-               
Assert.assertTrue(kryo.getKryo().getRegistration(Interval.class).getId() > 0);
-               
Assert.assertTrue(kryo.getKryo().getRegistration(Interval.class).getSerializer().getClass()
 == JodaIntervalSerializer.class);
+               
Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0);
+               
                // check if the generic type from one field is also registered 
(its very likely that
                // generic types are also used as fields somewhere.
-               
Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric.class).getId() > 
0);
+               
Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 
0);
+               
Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 
0);
                
Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0);
+               
+               
+               // register again and make sure classes are still registered
+               ExecutionConfig conf2 = new ExecutionConfig();
+               Serializers.recursivelyRegisterType(ClassWithNested.class, 
conf2, new HashSet<Class<?>>());
+               KryoSerializer<String> kryo2 = new 
KryoSerializer<>(String.class, conf);
+               
assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0);
+       }
+
+       @Test
+       public void testTypeRegistrationFromTypeInfo() {
+               ExecutionConfig conf = new ExecutionConfig();
+               Serializers.recursivelyRegisterType(new 
GenericTypeInfo<>(ClassWithNested.class), conf, new HashSet<Class<?>>());
+
+               KryoSerializer<String> kryo = new 
KryoSerializer<>(String.class, conf); // we create Kryo from another type.
+
+               
assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0);
+               
assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0);
+               assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 
0);
+
+               // check if the generic type from one field is also registered 
(its very likely that
+               // generic types are also used as fields somewhere.
+               
assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0);
+               
assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0);
+               assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 
0);
        }
 }

Reply via email to