Repository: flink Updated Branches: refs/heads/master ef58cf302 -> 5e2fb3cb5
[FLINK-3306] [core] Fix auto-type registry util Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4bc47af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4bc47af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4bc47af Branch: refs/heads/master Commit: c4bc47afa6147dd25d8b579e764b30c9c13ee8ea Parents: d902d16 Author: Stephan Ewen <se...@apache.org> Authored: Fri Jan 29 17:08:32 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 1 14:45:55 2016 +0100 ---------------------------------------------------------------------- .../api/io/avro/AvroRecordInputFormatTest.java | 115 ++++++++--------- .../flink/api/java/ExecutionEnvironment.java | 51 +++----- .../flink/api/java/typeutils/TypeExtractor.java | 5 + .../typeutils/runtime/kryo/Serializers.java | 128 +++++++++++-------- .../kryo/KryoGenericTypeSerializerTest.java | 31 ++--- .../typeutils/runtime/kryo/SerializersTest.java | 61 +++++++-- 6 files changed, 217 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java index c04435c..42cbebe 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java @@ -22,10 +22,7 @@ import static org.junit.Assert.*; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -219,41 +216,43 @@ public class AvroRecordInputFormatTest { */ @Test public void testDeserializeToGenericType() throws IOException { - DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(userSchema); - - FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader); - // initialize Record by reading it from disk (thats easier than creating it by hand) - GenericData.Record rec = new GenericData.Record(userSchema); - dataFileReader.next(rec); - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - assertEquals(null, rec.get("type_long_test")); // it is null for the first record. - - // now serialize it with our framework: - - TypeInformation<GenericData.Record> te = (TypeInformation<GenericData.Record>) TypeExtractor.createTypeInfo(GenericData.Record.class); - ExecutionConfig ec = new ExecutionConfig(); - Assert.assertEquals(GenericTypeInfo.class, te.getClass()); - Serializers.recursivelyRegisterType(( (GenericTypeInfo) te).getTypeClass(), ec); - - TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); - Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); - Assert.assertTrue( - ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && - ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); - ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); - tser.serialize(rec, target); - - GenericData.Record newRec = tser.deserialize(target.getInputView()); - - // check if it is still the same - assertNotNull(newRec); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); - assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() ); - assertEquals(null, newRec.get("type_long_test")); - + DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema); + + try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { + // initialize Record by reading it from disk (thats easier than creating it by hand) + GenericData.Record rec = new GenericData.Record(userSchema); + dataFileReader.next(rec); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + assertEquals(null, rec.get("type_long_test")); // it is null for the first record. + + // now serialize it with our framework: + TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class); + + ExecutionConfig ec = new ExecutionConfig(); + Assert.assertEquals(GenericTypeInfo.class, te.getClass()); + + Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>()); + + TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); + Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); + Assert.assertTrue( + ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && + ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); + + GenericData.Record newRec = tser.deserialize(target.getInputView()); + + // check if it is still the same + assertNotNull(newRec); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); + assertEquals("name not equal", TEST_NAME, newRec.get("name").toString()); + assertEquals(null, newRec.get("type_long_test")); + } } /** @@ -264,28 +263,30 @@ public class AvroRecordInputFormatTest { DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); - FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader); - User rec = dataFileReader.next(); + try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { + User rec = dataFileReader.next(); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + // now serialize it with our framework: + ExecutionConfig ec = new ExecutionConfig(); + TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class); - // now serialize it with our framework: - ExecutionConfig ec = new ExecutionConfig(); - TypeInformation<User> te = (TypeInformation<User>) TypeExtractor.createTypeInfo(User.class); - Assert.assertEquals(AvroTypeInfo.class, te.getClass()); - TypeSerializer<User> tser = te.createSerializer(ec); - ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); - tser.serialize(rec, target); + Assert.assertEquals(AvroTypeInfo.class, te.getClass()); + TypeSerializer<User> tser = te.createSerializer(ec); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); - User newRec = tser.deserialize(target.getInputView()); + User newRec = tser.deserialize(target.getInputView()); - // check if it is still the same - assertNotNull(newRec); - assertEquals("name not equal", TEST_NAME, newRec.getName().toString() ); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString() ); + // check if it is still the same + assertNotNull(newRec); + assertEquals("name not equal", TEST_NAME, newRec.getName().toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 0dd754a..10cb5e3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -20,12 +20,7 @@ package org.apache.flink.api.java; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import java.util.*; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -38,7 +33,6 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.operators.OperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.api.java.io.CsvReader; @@ -52,11 +46,7 @@ import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.operators.OperatorTranslation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.api.java.typeutils.*; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -942,30 +932,23 @@ public abstract class ExecutionEnvironment { plan.setDefaultParallelism(getParallelism()); } plan.setExecutionConfig(getConfig()); + // Check plan for GenericTypeInfo's and register the types at the serializers. - plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() { - @Override - public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) { - OperatorInformation<?> opInfo = visitable.getOperatorInfo(); - TypeInformation<?> typeInfo = opInfo.getOutputType(); - if(typeInfo instanceof GenericTypeInfo) { - GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo; - if(!config.isAutoTypeRegistrationDisabled()) { - Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config); - } - } - if(typeInfo instanceof CompositeType) { - List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>(); - Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite); - for(GenericTypeInfo<?> gt : genericTypesInComposite) { - Serializers.recursivelyRegisterType(gt.getTypeClass(), config); - } + if (!config.isAutoTypeRegistrationDisabled()) { + plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() { + + private final HashSet<Class<?>> deduplicator = new HashSet<>(); + + @Override + public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) { + OperatorInformation<?> opInfo = visitable.getOperatorInfo(); + Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator); + return true; } - return true; - } - @Override - public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {} - }); + @Override + public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {} + }); + } try { registerCachedFilesWithPlan(plan); http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index ddb4a48..b8f2075 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -332,6 +332,11 @@ public class TypeExtractor { // -------------------------------------------------------------------------------------------- // Create type information // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + public static <T> TypeInformation<T> createTypeInfo(Class<T> type) { + return (TypeInformation<T>) createTypeInfo((Type) type); + } public static TypeInformation<?> createTypeInfo(Type t) { TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t); http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java index 0ea8691..6903d35 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java @@ -23,24 +23,28 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; + import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer; import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificRecordBase; + import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.Serializable; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; +import java.lang.reflect.*; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; +import java.util.List; import java.util.Set; @@ -54,49 +58,78 @@ import java.util.Set; * Also, there is a Java Annotation for adding a default serializer (@DefaultSerializer) to classes. */ public class Serializers { - /** - * NOTE: This method is not a public Flink API. - * - * This method walks the entire hierarchy of the given type and registers all types it encounters - * to Kryo. - * It also watches for types which need special serializers. - */ - private static Set<Class<?>> alreadySeen = new HashSet<>(); - public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config) { - alreadySeen.add(type); + public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig config, Set<Class<?>> alreadySeen) { + if (typeInfo instanceof GenericTypeInfo) { + GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo; + Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config, alreadySeen); + } + else if (typeInfo instanceof CompositeType) { + List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>(); + Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite); + for (GenericTypeInfo<?> gt : genericTypesInComposite) { + Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen); + } + } + else if (typeInfo instanceof ObjectArrayTypeInfo) { + ObjectArrayTypeInfo<?, ?> objectArrayTypeInfo = (ObjectArrayTypeInfo<?, ?>) typeInfo; + recursivelyRegisterType(objectArrayTypeInfo.getComponentInfo(), config, alreadySeen); + } + } + + public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config, Set<Class<?>> alreadySeen) { + // don't register or remember primitives + if (type == null || type.isPrimitive() || type == Object.class) { + return; + } - if (type.isPrimitive()) { + // prevent infinite recursion for recursive types + if (!alreadySeen.add(type)) { return; } - config.registerKryoType(type); - addSerializerForType(config, type); - - Field[] fields = type.getDeclaredFields(); - for (Field field : fields) { - if(Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { - continue; - } - Type fieldType = field.getGenericType(); - if (fieldType instanceof ParameterizedType) { // field has generics - ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType; - for (Type t: parameterizedFieldType.getActualTypeArguments()) { - if (TypeExtractor.isClassType(t) ) { - Class<?> clazz = TypeExtractor.typeToClass(t); - if (!alreadySeen.contains(clazz)) { - recursivelyRegisterType(TypeExtractor.typeToClass(t), config); - } - } + + if (type.isArray()) { + recursivelyRegisterType(type.getComponentType(), config, alreadySeen); + } + else { + config.registerKryoType(type); + checkAndAddSerializerForTypeAvro(config, type); + + Field[] fields = type.getDeclaredFields(); + for (Field field : fields) { + if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { + continue; } + Type fieldType = field.getGenericType(); + recursivelyRegisterGenericType(fieldType, config, alreadySeen); } - Class<?> clazz = field.getType(); - if (!alreadySeen.contains(clazz)) { - recursivelyRegisterType(clazz, config); + } + } + + private static void recursivelyRegisterGenericType(Type fieldType, ExecutionConfig config, Set<Class<?>> alreadySeen) { + if (fieldType instanceof ParameterizedType) { + // field has generics + ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType; + + for (Type t: parameterizedFieldType.getActualTypeArguments()) { + if (TypeExtractor.isClassType(t) ) { + recursivelyRegisterType(TypeExtractor.typeToClass(t), config, alreadySeen); + } } + + recursivelyRegisterGenericType(parameterizedFieldType.getRawType(), config, alreadySeen); + } + else if (fieldType instanceof GenericArrayType) { + GenericArrayType genericArrayType = (GenericArrayType) fieldType; + recursivelyRegisterGenericType(genericArrayType.getGenericComponentType(), config, alreadySeen); + } + else if (fieldType instanceof Class) { + Class<?> clazz = (Class<?>) fieldType; + recursivelyRegisterType(clazz, config, alreadySeen); } } - public static void addSerializerForType(ExecutionConfig reg, Class<?> type) { + private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) { if (GenericData.Record.class.isAssignableFrom(type)) { registerGenericAvro(reg); } @@ -105,16 +138,13 @@ public class Serializers { Class<? extends SpecificRecordBase> specRecordClass = (Class<? extends SpecificRecordBase>) type; registerSpecificAvro(reg, specRecordClass); } - if (DateTime.class.isAssignableFrom(type) || Interval.class.isAssignableFrom(type)) { - registerJodaTime(reg); - } } /** * Register these serializers for using Avro's {@link GenericData.Record} and classes * implementing {@link org.apache.avro.specific.SpecificRecordBase} */ - public static void registerGenericAvro(ExecutionConfig reg) { + private static void registerGenericAvro(ExecutionConfig reg) { // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type // because Kryo is not able to serialize them properly, we use this serializer for them reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class); @@ -126,8 +156,7 @@ public class Serializers { reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); } - - public static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase> avroType) { + private static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase> avroType) { registerGenericAvro(reg); // This rule only applies if users explicitly use the GenericTypeInformation for the avro types // usually, we are able to handle Avro POJOs with the POJO serializer. @@ -136,14 +165,13 @@ public class Serializers { // ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(avroType); // reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); } - - + /** * Currently, the following classes of JodaTime are supported: - * - DateTime - * - Interval + * - DateTime + * - Interval * - * The following chronologies are supported: (see {@link de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer}) + * The following chronologies are supported: (see {@link de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer}) * <ul> * <li>{@link org.joda.time.chrono.ISOChronology}</li> * <li>{@link org.joda.time.chrono.CopticChronology}</li> @@ -159,7 +187,7 @@ public class Serializers { reg.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); reg.registerTypeWithKryoSerializer(Interval.class, JodaIntervalSerializer.class); } - + /** * Register less frequently used serializers */ http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java index 4b6e432..8ff0b1b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoGenericTypeSerializerTest.java @@ -18,23 +18,24 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; -import static org.junit.Assert.*; - import com.esotericsoftware.kryo.Kryo; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.ComparatorTestBase; import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer; -import org.joda.time.DateTime; -import org.junit.Test; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.junit.Test; + import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; import java.util.Random; +import static org.junit.Assert.*; + @SuppressWarnings("unchecked") public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { @@ -42,7 +43,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer @Test public void testJavaList(){ - Collection<Integer> a = new ArrayList<Integer>(); + Collection<Integer> a = new ArrayList<>(); fillCollection(a); @@ -51,7 +52,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer @Test public void testJavaSet(){ - Collection<Integer> b = new HashSet<Integer>(); + Collection<Integer> b = new HashSet<>(); fillCollection(b); @@ -62,24 +63,12 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer @Test public void testJavaDequeue(){ - Collection<Integer> c = new LinkedList<Integer>(); - + Collection<Integer> c = new LinkedList<>(); fillCollection(c); - runTests(c); } - @Test - public void testJodaTime(){ - Collection<DateTime> b = new HashSet<DateTime>(); - Serializers.registerJodaTime(ec); - b.add(new DateTime(1)); - b.add(new DateTime(2)); - - runTests(b); - } - - private void fillCollection(Collection<Integer> coll){ + private void fillCollection(Collection<Integer> coll) { coll.add(42); coll.add(1337); coll.add(49); @@ -140,7 +129,7 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer int numElements = 100; // construct a memory target that is too small for the string TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements); - KryoSerializer<Integer> serializer = new KryoSerializer<Integer>(Integer.class, new ExecutionConfig()); + KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new ExecutionConfig()); for(int i = 0; i < numElements; i++){ serializer.serialize(i, target); http://git-wip-us.apache.org/repos/asf/flink/blob/c4bc47af/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java index 96d761a..7c6d023 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/SerializersTest.java @@ -15,15 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.typeutils.runtime.kryo; -import de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer; import org.apache.flink.api.common.ExecutionConfig; -import org.joda.time.Interval; + +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.core.fs.Path; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.HashSet; + +import static org.junit.Assert.assertTrue; public class SerializersTest { @@ -31,36 +36,68 @@ public class SerializersTest { public static class Node { private Node parent; } + public static class FromNested { Node recurseMe; } - public static class FromGeneric { - - } + + public static class FromGeneric1 {} + public static class FromGeneric2 {} + public static class Nested1 { private FromNested fromNested; - private Interval yodaIntervall; + private Path yodaIntervall; } public static class ClassWithNested { + Nested1 nested; int ab; - ArrayList<FromGeneric> addGenType; + + ArrayList<FromGeneric1> addGenType; + FromGeneric2[] genericArrayType; } @Test public void testTypeRegistration() { ExecutionConfig conf = new ExecutionConfig(); - Serializers.recursivelyRegisterType(ClassWithNested.class, conf); - KryoSerializer kryo = new KryoSerializer(String.class, conf); // we create Kryo from another type. + Serializers.recursivelyRegisterType(ClassWithNested.class, conf, new HashSet<Class<?>>()); + + KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type. Assert.assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0); Assert.assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0); - Assert.assertTrue(kryo.getKryo().getRegistration(Interval.class).getId() > 0); - Assert.assertTrue(kryo.getKryo().getRegistration(Interval.class).getSerializer().getClass() == JodaIntervalSerializer.class); + Assert.assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0); + // check if the generic type from one field is also registered (its very likely that // generic types are also used as fields somewhere. - Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric.class).getId() > 0); + Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0); + Assert.assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0); Assert.assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0); + + + // register again and make sure classes are still registered + ExecutionConfig conf2 = new ExecutionConfig(); + Serializers.recursivelyRegisterType(ClassWithNested.class, conf2, new HashSet<Class<?>>()); + KryoSerializer<String> kryo2 = new KryoSerializer<>(String.class, conf); + assertTrue(kryo2.getKryo().getRegistration(FromNested.class).getId() > 0); + } + + @Test + public void testTypeRegistrationFromTypeInfo() { + ExecutionConfig conf = new ExecutionConfig(); + Serializers.recursivelyRegisterType(new GenericTypeInfo<>(ClassWithNested.class), conf, new HashSet<Class<?>>()); + + KryoSerializer<String> kryo = new KryoSerializer<>(String.class, conf); // we create Kryo from another type. + + assertTrue(kryo.getKryo().getRegistration(FromNested.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(ClassWithNested.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(Path.class).getId() > 0); + + // check if the generic type from one field is also registered (its very likely that + // generic types are also used as fields somewhere. + assertTrue(kryo.getKryo().getRegistration(FromGeneric1.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(FromGeneric2.class).getId() > 0); + assertTrue(kryo.getKryo().getRegistration(Node.class).getId() > 0); } }