[FLINK-1463] Fix stateful/stateless Serializers and Comparators Before, Serializers would announce whether they are stateful or not and rely on RuntimeStatefulSerializerFactory to do the duplication. Comparators, on the other hand, had a duplicate method that the user was required to call.
This commit removes the statful/stateless property from Serializers but instead introduces a duplicate() method, similar to Comparators, that can return the same instance. The two serializer factories are merged into one that always calls duplicate() before returning a serializer. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b6f85f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b6f85f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b6f85f Branch: refs/heads/release-0.8 Commit: 02b6f85fed8c1409b586e1e934acd72cee54adac Parents: 91382bb Author: Aljoscha Krettek <[email protected]> Authored: Fri Jan 30 16:43:31 2015 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Feb 9 14:20:23 2015 +0100 ---------------------------------------------------------------------- .../api/function/source/FileSourceFunction.java | 10 +- .../streamrecord/StreamRecordSerializer.java | 4 +- .../compiler/postpass/JavaApiPostPass.java | 11 +- .../api/common/typeutils/TypeSerializer.java | 16 +-- .../typeutils/base/BooleanSerializer.java | 5 - .../typeutils/base/BooleanValueSerializer.java | 5 - .../common/typeutils/base/ByteSerializer.java | 5 - .../typeutils/base/ByteValueSerializer.java | 5 - .../common/typeutils/base/CharSerializer.java | 5 - .../typeutils/base/CharValueSerializer.java | 5 - .../common/typeutils/base/DateSerializer.java | 5 - .../common/typeutils/base/DoubleSerializer.java | 5 - .../typeutils/base/DoubleValueSerializer.java | 5 - .../common/typeutils/base/EnumSerializer.java | 4 +- .../common/typeutils/base/FloatSerializer.java | 5 - .../typeutils/base/FloatValueSerializer.java | 5 - .../typeutils/base/GenericArraySerializer.java | 12 +- .../common/typeutils/base/IntSerializer.java | 5 - .../typeutils/base/IntValueSerializer.java | 5 - .../common/typeutils/base/LongSerializer.java | 5 - .../typeutils/base/LongValueSerializer.java | 5 - .../common/typeutils/base/ShortSerializer.java | 5 - .../typeutils/base/ShortValueSerializer.java | 5 - .../common/typeutils/base/StringSerializer.java | 5 - .../typeutils/base/StringValueSerializer.java | 5 - .../typeutils/base/TypeSerializerSingleton.java | 7 +- .../common/typeutils/base/VoidSerializer.java | 5 - .../array/BooleanPrimitiveArraySerializer.java | 5 - .../array/BytePrimitiveArraySerializer.java | 5 - .../array/CharPrimitiveArraySerializer.java | 5 - .../array/DoublePrimitiveArraySerializer.java | 5 - .../array/FloatPrimitiveArraySerializer.java | 5 - .../base/array/IntPrimitiveArraySerializer.java | 5 - .../array/LongPrimitiveArraySerializer.java | 5 - .../array/ShortPrimitiveArraySerializer.java | 5 - .../base/array/StringArraySerializer.java | 5 - .../typeutils/record/RecordSerializer.java | 5 +- .../java/typeutils/runtime/AvroSerializer.java | 4 +- .../runtime/CopyableValueSerializer.java | 4 +- .../runtime/GenericTypeComparator.java | 14 +- .../java/typeutils/runtime/KryoSerializer.java | 4 +- .../java/typeutils/runtime/PojoSerializer.java | 33 +++-- .../runtime/RuntimeSerializerFactory.java | 124 ++++++++++++++++ .../RuntimeStatefulSerializerFactory.java | 140 ------------------- .../RuntimeStatelessSerializerFactory.java | 120 ---------------- .../typeutils/runtime/TupleComparatorBase.java | 28 +--- .../java/typeutils/runtime/TupleSerializer.java | 20 +++ .../typeutils/runtime/TupleSerializerBase.java | 19 +-- .../java/typeutils/runtime/ValueSerializer.java | 4 +- .../typeutils/runtime/WritableSerializer.java | 4 +- .../api/java/io/CollectionInputFormatTest.java | 4 +- .../operators/drivers/TestTaskContext.java | 6 +- .../sort/MassiveStringSortingITCase.java | 7 +- .../sort/MassiveStringValueSortingITCase.java | 7 +- .../testutils/types/IntListSerializer.java | 4 +- .../testutils/types/IntPairSerializer.java | 8 +- .../testutils/types/StringPairSerializer.java | 4 +- .../scala/typeutils/CaseClassSerializer.scala | 16 ++- .../api/scala/typeutils/EitherSerializer.scala | 2 +- .../api/scala/typeutils/NothingSerializer.scala | 2 +- .../api/scala/typeutils/OptionSerializer.scala | 2 +- .../scala/typeutils/TraversableSerializer.scala | 18 ++- .../api/scala/typeutils/TrySerializer.scala | 2 +- .../VertexWithAdjacencyListSerializer.java | 5 - .../VertexWithRankAndDanglingSerializer.java | 5 - .../types/VertexWithRankSerializer.java | 5 - .../misc/MassiveCaseClassSortingITCase.scala | 6 +- 67 files changed, 268 insertions(+), 567 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java index 5dfe4b2..a5ef3a7 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -24,8 +24,7 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -49,12 +48,7 @@ public class FileSourceFunction extends RichSourceFunction<String> { private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) { TypeSerializer<String> serializer = typeInfo.createSerializer(); - if (serializer.isStateful()) { - return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass()); - } else { - return new RuntimeStatelessSerializerFactory<String>(serializer, - typeInfo.getTypeClass()); - } + return new RuntimeSerializerFactory<String>(serializer, typeInfo.getTypeClass()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java index 85faa9e..98f12ec 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java @@ -47,8 +47,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord } @Override - public boolean isStateful() { - return false; + public StreamRecordSerializer<T> duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java index 2273461..e1b5bf8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java @@ -43,8 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory; import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.CompilerPostPassException; import org.apache.flink.compiler.plan.BulkIterationPlanNode; @@ -278,12 +277,8 @@ public class JavaApiPostPass implements OptimizerPostPass { private static <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) { TypeSerializer<T> serializer = typeInfo.createSerializer(); - - if (serializer.isStateful()) { - return new RuntimeStatefulSerializerFactory<T>(serializer, typeInfo.getTypeClass()); - } else { - return new RuntimeStatelessSerializerFactory<T>(serializer, typeInfo.getTypeClass()); - } + + return new RuntimeSerializerFactory<T>(serializer, typeInfo.getTypeClass()); } private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index 5e32c86..329e826 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -50,17 +50,15 @@ public abstract class TypeSerializer<T> implements Serializable { */ public abstract boolean isImmutableType(); - /** - * Gets whether the serializer is stateful. Statefulness means in this context that some of the serializer's - * methods have objects with state and are thus not inherently thread-safe. A stateful serializer might be used by - * multiple threads concurrently. For a stateful one, different instances will be used by different threads. - * - * @return True, if the serializer is stateful, false if it is stateless; + * Creates a deep copy of this serializer if it is necessary, i.e. if it is stateful. This + * can return itself if the serializer is not stateful. + * + * We need this because Serializers might be used in several threads. Stateless serializers + * are inherently thread-safe while stateful serializers might not be thread-safe. */ - public abstract boolean isStateful(); - - + public abstract TypeSerializer<T> duplicate(); + // -------------------------------------------------------------------------------------------- // Instantiation & Cloning // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java index ecfb3c2..a844ac8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java @@ -37,11 +37,6 @@ public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Boolean createInstance() { return FALSE; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java index 4795055..3aae95d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java @@ -38,11 +38,6 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea } @Override - public boolean isStateful() { - return false; - } - - @Override public BooleanValue createInstance() { return new BooleanValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java index 32f3edd..92b3685 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java @@ -39,11 +39,6 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Byte createInstance() { return ZERO; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java index 24cc98b..e523d5e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java @@ -38,11 +38,6 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue } @Override - public boolean isStateful() { - return false; - } - - @Override public ByteValue createInstance() { return new ByteValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java index c46d3a0..181db56 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java @@ -39,11 +39,6 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Character createInstance() { return ZERO; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java index 71a8ef4..690509c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java @@ -37,11 +37,6 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> { } @Override - public boolean isStateful() { - return false; - } - - @Override public CharValue createInstance() { return new CharValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java index 4bd2ea8..6aa11eb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java @@ -37,11 +37,6 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Date createInstance() { return new Date(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java index 8e09f7c..24af95c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java @@ -38,11 +38,6 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Double createInstance() { return ZERO; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java index f4c7f37..34434f1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java @@ -38,11 +38,6 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV } @Override - public boolean isStateful() { - return false; - } - - @Override public DoubleValue createInstance() { return new DoubleValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java index 7ecf82a..643e4fa 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java @@ -46,8 +46,8 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> { } @Override - public boolean isStateful() { - return false; + public EnumSerializer<T> duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java index b1a46b0..c823783 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java @@ -38,11 +38,6 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Float createInstance() { return ZERO; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java index 6ebb268..15d00b5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java @@ -38,11 +38,6 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> { } @Override - public boolean isStateful() { - return false; - } - - @Override public FloatValue createInstance() { return new FloatValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java index c72132d..355abd0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java @@ -50,15 +50,21 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> { this.componentSerializer = componentSerializer; this.EMPTY = create(0); } - + @Override public boolean isImmutableType() { return false; } @Override - public boolean isStateful() { - return this.componentSerializer.isStateful(); + public GenericArraySerializer<C> duplicate() { + TypeSerializer<C> duplicateComponentSerializer = this.componentSerializer.duplicate(); + if (duplicateComponentSerializer == this.componentSerializer) { + // is not stateful, return ourselves + return this; + } else { + return new GenericArraySerializer<C>(componentClass, duplicateComponentSerializer); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java index 2937b2a..778f044 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java @@ -39,11 +39,6 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Integer createInstance() { return ZERO; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java index ec1f345..c2d1b60 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java @@ -38,11 +38,6 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> } @Override - public boolean isStateful() { - return false; - } - - @Override public IntValue createInstance() { return new IntValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java index 6b25596..6d8b758 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java @@ -39,11 +39,6 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Long createInstance() { return ZERO; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java index 95caf04..37dec40 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java @@ -38,11 +38,6 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue } @Override - public boolean isStateful() { - return false; - } - - @Override public LongValue createInstance() { return new LongValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java index c6e7870..44e5e3e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java @@ -39,11 +39,6 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Short createInstance() { return ZERO; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java index ab58987..1dbe4a5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java @@ -38,11 +38,6 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal } @Override - public boolean isStateful() { - return false; - } - - @Override public ShortValue createInstance() { return new ShortValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java index 71221a2..7b26600 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java @@ -39,11 +39,6 @@ public final class StringSerializer extends TypeSerializerSingleton<String> { } @Override - public boolean isStateful() { - return false; - } - - @Override public String createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java index c5d5b55..7628cab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java @@ -40,11 +40,6 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV } @Override - public boolean isStateful() { - return false; - } - - @Override public StringValue createInstance() { return new StringValue(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java index 979d5ab..e076e5b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java @@ -25,7 +25,12 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{ private static final long serialVersionUID = 8766687317209282373L; // -------------------------------------------------------------------------------------------- - + + @Override + public TypeSerializerSingleton<T> duplicate() { + return this; + } + @Override public int hashCode() { return super.hashCode(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java index 33bb901..272ffbd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java @@ -35,11 +35,6 @@ public final class VoidSerializer extends TypeSerializerSingleton<Void> { } @Override - public boolean isStateful() { - return false; - } - - @Override public Void createInstance() { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java index e9941a8..4a493ac 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java @@ -40,11 +40,6 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet public boolean isImmutableType() { return false; } - - @Override - public boolean isStateful() { - return false; - } @Override public boolean[] createInstance() { http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java index aaf867f..fb4d506 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java @@ -41,11 +41,6 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton< } @Override - public boolean isStateful() { - return false; - } - - @Override public byte[] createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java index 64632bd..8e3c4ea 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java @@ -42,11 +42,6 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton< } @Override - public boolean isStateful() { - return false; - } - - @Override public char[] createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java index 846ae74..10e25c2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java @@ -42,11 +42,6 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto } @Override - public boolean isStateful() { - return false; - } - - @Override public double[] createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java index 8f42ac8..d57af00 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java @@ -42,11 +42,6 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton } @Override - public boolean isStateful() { - return false; - } - - @Override public float[] createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java index 2ab056c..eaff287 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java @@ -42,11 +42,6 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{ } @Override - public boolean isStateful() { - return false; - } - - @Override public int[] createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java index 5d34dfe..55a22c2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java @@ -42,11 +42,6 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton< } @Override - public boolean isStateful() { - return false; - } - - @Override public long[] createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java index 2f37033..08275b0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java @@ -40,11 +40,6 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton public boolean isImmutableType() { return false; } - - @Override - public boolean isStateful() { - return false; - } @Override public short[] createInstance() { http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java index d5ab030..ad172a8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java @@ -44,11 +44,6 @@ public final class StringArraySerializer extends TypeSerializerSingleton<String[ } @Override - public boolean isStateful() { - return false; - } - - @Override public String[] createInstance() { return EMPTY; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java index 7b72e89..11b21d6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java @@ -57,8 +57,9 @@ public final class RecordSerializer extends TypeSerializer<Record> { } @Override - public boolean isStateful() { - return false; + public RecordSerializer duplicate() { + // does not hold state, so just return ourselves + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java index cc72fa3..2758bd6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -79,8 +79,8 @@ public final class AvroSerializer<T> extends TypeSerializer<T> { } @Override - public boolean isStateful() { - return true; + public AvroSerializer duplicate() { + return new AvroSerializer(type, typeToInstantiate); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java index 8710f2d..193d495 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java @@ -47,8 +47,8 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer } @Override - public boolean isStateful() { - return false; + public CopyableValueSerializer<T> duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java index 7caa770..039cef7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java @@ -24,7 +24,6 @@ import java.io.IOException; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; @@ -42,9 +41,7 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat private final Class<T> type; - private final TypeSerializerFactory<T> serializerFactory; - - private transient TypeSerializer<T> serializer; + private TypeSerializer<T> serializer; private transient T reference; @@ -61,15 +58,11 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat this.ascending = ascending; this.serializer = serializer; this.type = type; - - this.serializerFactory = this.serializer.isStateful() - ? new RuntimeStatefulSerializerFactory<T>(this.serializer, this.type) - : new RuntimeStatelessSerializerFactory<T>(this.serializer, this.type); } private GenericTypeComparator(GenericTypeComparator<T> toClone) { this.ascending = toClone.ascending; - this.serializerFactory = toClone.serializerFactory; + this.serializer = toClone.serializer.duplicate(); this.type = toClone.type; } @@ -104,9 +97,6 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat @Override public int compareSerialized(final DataInputView firstSource, final DataInputView secondSource) throws IOException { - if (this.serializer == null) { - this.serializer = this.serializerFactory.getSerializer(); - } if (this.reference == null) { this.reference = this.serializer.createInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 3a51534..a5f5f65 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -63,8 +63,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { } @Override - public boolean isStateful() { - return true; + public KryoSerializer<T> duplicate() { + return new KryoSerializer<T>(this.type); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 99b9f65..7bd3243 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -42,9 +42,6 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private final int numFields; - private final boolean stateful; - - @SuppressWarnings("unchecked") public PojoSerializer(Class<T> clazz, TypeSerializer<?>[] fieldSerializers, Field[] fields) { this.clazz = clazz; @@ -55,15 +52,6 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); } - - boolean stateful = false; - for (TypeSerializer<?> ser : fieldSerializers) { - if (ser.isStateful()) { - stateful = true; - break; - } - } - this.stateful = stateful; } private void writeObject(ObjectOutputStream out) @@ -109,10 +97,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } @Override - public boolean isStateful() { - return this.stateful; + public PojoSerializer<T> duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + + for (int i = 0; i < fieldSerializers.length; i++) { + duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); + if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + // at least one of them is stateful + stateful = true; + } + } + + if (stateful) { + return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields); + } else { + return this; + } } - + @Override public T createInstance() { http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java new file mode 100644 index 0000000..96aff73 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.InstantiationUtil; + +public final class RuntimeSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable { + + private static final long serialVersionUID = 1L; + + + private static final String CONFIG_KEY_SER = "SER_DATA"; + + private static final String CONFIG_KEY_CLASS = "CLASS_DATA"; + + + private TypeSerializer<T> serializer; + + private boolean firstSerializer = true; + + private Class<T> clazz; + + // Because we read the class from the TaskConfig and instantiate ourselves + public RuntimeSerializerFactory() {} + + public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) { + if (serializer == null || clazz == null) { + throw new NullPointerException(); + } + + this.clazz = clazz; + this.serializer = serializer; + } + + + @Override + public void writeParametersToConfig(Configuration config) { + try { + InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS); + InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER); + } + catch (Exception e) { + throw new RuntimeException("Could not serialize serializer into the configuration.", e); + } + } + + @SuppressWarnings("unchecked") + @Override + public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { + if (config == null || cl == null) { + throw new NullPointerException(); + } + + try { + this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); + this.serializer = (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl); + firstSerializer = true; + } + catch (ClassNotFoundException e) { + throw e; + } + catch (Exception e) { + throw new RuntimeException("Could not load deserializer from the configuration.", e); + } + } + + @Override + public TypeSerializer<T> getSerializer() { + if (this.serializer != null) { + if (firstSerializer) { + firstSerializer = false; + return this.serializer; + } else { + return this.serializer.duplicate(); + } + } else { + throw new RuntimeException("SerializerFactory has not been initialized from configuration."); + } + } + + @Override + public Class<T> getDataType() { + return clazz; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return clazz.hashCode() ^ serializer.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof RuntimeSerializerFactory) { + RuntimeSerializerFactory<?> other = (RuntimeSerializerFactory<?>) obj; + + return this.clazz == other.clazz && + this.serializer.equals(other.serializer); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java deleted file mode 100644 index 19cd3b7..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.InstantiationUtil; - -public final class RuntimeStatefulSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private static final String CONFIG_KEY_SER = "SER_DATA"; - - private static final String CONFIG_KEY_CLASS = "CLASS_DATA"; - - private byte[] serializerData; - - private TypeSerializer<T> serializer; // only for equality comparisons - - private transient ClassLoader loader; - - private Class<T> clazz; - - public RuntimeStatefulSerializerFactory() {} - - public RuntimeStatefulSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) { - this.clazz = clazz; - this.loader = serializer.getClass().getClassLoader(); - - try { - this.serializerData = InstantiationUtil.serializeObject(serializer); - } catch (IOException e) { - throw new RuntimeException("Cannt serialize the Serializer.", e); - } - } - - - @Override - public void writeParametersToConfig(Configuration config) { - try { - InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS); - config.setBytes(CONFIG_KEY_SER, this.serializerData); - } - catch (Exception e) { - throw new RuntimeException("Could not serialize serializer into the configuration.", e); - } - } - - @SuppressWarnings("unchecked") - @Override - public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { - if (config == null || cl == null) { - throw new NullPointerException(); - } - - this.serializerData = config.getBytes(CONFIG_KEY_SER, null); - if (this.serializerData == null) { - throw new RuntimeException("Could not find deserializer in the configuration."); - } - - this.loader = cl; - - try { - this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); - } - catch (ClassNotFoundException e) { - throw e; - } - catch (Exception e) { - throw new RuntimeException("Could not load deserializer from the configuration.", e); - } - } - - @SuppressWarnings("unchecked") - @Override - public TypeSerializer<T> getSerializer() { - if (serializerData != null) { - try { - return (TypeSerializer<T>) InstantiationUtil.deserializeObject(this.serializerData, this.loader); - } catch (Exception e) { - throw new RuntimeException("Repeated instantiation of serializer failed.", e); - } - } else { - throw new RuntimeException("SerializerFactory has not been initialized from configuration."); - } - } - - @Override - public Class<T> getDataType() { - return this.clazz; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return clazz.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof RuntimeStatefulSerializerFactory) { - @SuppressWarnings("unchecked") - RuntimeStatefulSerializerFactory<T> other = (RuntimeStatefulSerializerFactory<T>) obj; - - if (this.serializer == null) { - this.serializer = getSerializer(); - } - if (other.serializer == null) { - other.serializer = other.getSerializer(); - } - - return this.clazz == other.clazz && - this.serializer.equals(other.serializer); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java deleted file mode 100644 index 041b824..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.InstantiationUtil; - -public final class RuntimeStatelessSerializerFactory<T> implements TypeSerializerFactory<T>, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - - private static final String CONFIG_KEY_SER = "SER_DATA"; - - private static final String CONFIG_KEY_CLASS = "CLASS_DATA"; - - - private TypeSerializer<T> serializer; - - private Class<T> clazz; - - - public RuntimeStatelessSerializerFactory() {} - - public RuntimeStatelessSerializerFactory(TypeSerializer<T> serializer, Class<T> clazz) { - if (serializer == null || clazz == null) { - throw new NullPointerException(); - } - - if (serializer.isStateful()) { - throw new IllegalArgumentException("Cannot use the stateless serializer factory with a stateful serializer."); - } - - this.clazz = clazz; - this.serializer = serializer; - } - - - @Override - public void writeParametersToConfig(Configuration config) { - try { - InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS); - InstantiationUtil.writeObjectToConfig(serializer, config, CONFIG_KEY_SER); - } - catch (Exception e) { - throw new RuntimeException("Could not serialize serializer into the configuration.", e); - } - } - - @SuppressWarnings("unchecked") - @Override - public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { - if (config == null || cl == null) { - throw new NullPointerException(); - } - - try { - this.clazz = (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); - this.serializer = (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl); - } - catch (ClassNotFoundException e) { - throw e; - } - catch (Exception e) { - throw new RuntimeException("Could not load deserializer from the configuration.", e); - } - } - - @Override - public TypeSerializer<T> getSerializer() { - if (this.serializer != null) { - return this.serializer; - } else { - throw new RuntimeException("SerializerFactory has not been initialized from configuration."); - } - } - - @Override - public Class<T> getDataType() { - return clazz; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return clazz.hashCode() ^ serializer.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof RuntimeStatelessSerializerFactory) { - RuntimeStatelessSerializerFactory<?> other = (RuntimeStatelessSerializerFactory<?>) obj; - - return this.clazz == other.clazz && - this.serializer.equals(other.serializer); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java index abcf89c..28169e5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.flink.api.common.typeutils.CompositeTypeComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.KeyFieldOutOfBoundsException; @@ -41,10 +40,6 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> @SuppressWarnings("rawtypes") protected TypeComparator[] comparators; - /** serializer factories to duplicate non thread-safe serializers */ - protected TypeSerializerFactory<Object>[] serializerFactories; - - protected int[] normalizedKeyLengths; protected int numLeadingNormalizableKeys; @@ -56,7 +51,7 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> /** serializers to deserialize the first n fields for comparison */ @SuppressWarnings("rawtypes") - protected transient TypeSerializer[] serializers; + protected TypeSerializer[] serializers; // cache for the deserialized field objects protected transient Object[] deserializedFields1; @@ -70,14 +65,6 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> this.comparators = (TypeComparator<Object>[]) comparators; this.serializers = (TypeSerializer<Object>[]) serializers; - // set the serializer factories. - this.serializerFactories = new TypeSerializerFactory[this.serializers.length]; - for (int i = 0; i < serializers.length; i++) { - this.serializerFactories[i] = this.serializers[i].isStateful() ? - new RuntimeStatefulSerializerFactory<Object>(this.serializers[i], Object.class) : - new RuntimeStatelessSerializerFactory<Object>(this.serializers[i], Object.class); - } - // set up auxiliary fields for normalized key support this.normalizedKeyLengths = new int[keyPositions.length]; int nKeys = 0; @@ -129,7 +116,11 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> protected void privateDuplicate(TupleComparatorBase<T> toClone) { // copy fields and serializer factories this.keyPositions = toClone.keyPositions; - this.serializerFactories = toClone.serializerFactories; + + this.serializers = new TypeSerializer[toClone.serializers.length]; + for (int i = 0; i < toClone.serializers.length; i++) { + this.serializers[i] = toClone.serializers[i].duplicate(); + } this.comparators = new TypeComparator[toClone.comparators.length]; for (int i = 0; i < toClone.comparators.length; i++) { @@ -261,13 +252,6 @@ public abstract class TupleComparatorBase<T> extends CompositeTypeComparator<T> // -------------------------------------------------------------------------------------------- protected final void instantiateDeserializationUtils() { - if (this.serializers == null) { - this.serializers = new TypeSerializer[this.serializerFactories.length]; - for (int i = 0; i < this.serializers.length; i++) { - this.serializers[i] = this.serializerFactories[i].getSerializer(); - } - } - this.deserializedFields1 = new Object[this.serializers.length]; this.deserializedFields2 = new Object[this.serializers.length]; http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index ae429a7..9071446 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -36,6 +36,26 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase< } @Override + public TupleSerializer<T> duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + + for (int i = 0; i < fieldSerializers.length; i++) { + duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); + if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + // at least one of them is stateful + stateful = true; + } + } + + if (stateful) { + return new TupleSerializer<T>(tupleClass, duplicateFieldSerializers); + } else { + return this; + } + } + + @Override public T createInstance() { try { T t = tupleClass.newInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index 08df7d3..afc99d0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -32,27 +32,15 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { protected final Class<T> tupleClass; - protected final TypeSerializer<Object>[] fieldSerializers; + protected TypeSerializer<Object>[] fieldSerializers; protected final int arity; - protected final boolean stateful; - - @SuppressWarnings("unchecked") public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) { this.tupleClass = tupleClass; this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers; this.arity = fieldSerializers.length; - - boolean stateful = false; - for (TypeSerializer<?> ser : fieldSerializers) { - if (ser.isStateful()) { - stateful = true; - break; - } - } - this.stateful = stateful; } @@ -62,11 +50,6 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { } @Override - public boolean isStateful() { - return this.stateful; - } - - @Override public int getLength() { return -1; } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java index d6c35cb..ad1b0f0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java @@ -62,8 +62,8 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> { } @Override - public boolean isStateful() { - return true; + public ValueSerializer<T> duplicate() { + return new ValueSerializer<T>(type); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index c89733e..777122e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -98,8 +98,8 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> { } @Override - public boolean isStateful() { - return true; + public WritableSerializer duplicate() { + return new WritableSerializer(typeClass); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index 64dae22..118e707 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -278,8 +278,8 @@ public class CollectionInputFormatTest { } @Override - public boolean isStateful() { - return false; + public TestSerializer duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index e5ece3f..09ef9ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.drivers; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -86,13 +86,13 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> { @SuppressWarnings("unchecked") public <X> void setInput1(MutableObjectIterator<X> input, TypeSerializer<X> serializer) { this.input1 = input; - this.serializer1 = new RuntimeStatefulSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass()); + this.serializer1 = new RuntimeSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass()); } @SuppressWarnings("unchecked") public <X> void setInput2(MutableObjectIterator<X> input, TypeSerializer<X> serializer) { this.input2 = input; - this.serializer2 = new RuntimeStatefulSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass()); + this.serializer2 = new RuntimeSerializerFactory<X>(serializer, (Class<X>) serializer.createInstance().getClass()); } public void setComparator1(TypeComparator<?> comparator) { http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java index 9dec847..63ca948 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java @@ -34,12 +34,11 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.util.MutableObjectIterator; import org.junit.Assert; @@ -95,7 +94,7 @@ public class MassiveStringSortingITCase { MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f); MutableObjectIterator<String> sortedData = sorter.getIterator(); @@ -187,7 +186,7 @@ public class MassiveStringSortingITCase { MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f); http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java index f2a3fc7..8368c61 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java @@ -33,12 +33,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.types.StringValue; import org.apache.flink.util.MutableObjectIterator; @@ -91,7 +90,7 @@ public class MassiveStringValueSortingITCase { MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f); MutableObjectIterator<StringValue> sortedData = sorter.getIterator(); @@ -185,7 +184,7 @@ public class MassiveStringValueSortingITCase { MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f); http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java index 2134bcd..69dfeb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java @@ -35,8 +35,8 @@ public class IntListSerializer extends TypeSerializer<IntList> { } @Override - public boolean isStateful() { - return false; + public IntListSerializer duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java index 361585d..c2571cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java @@ -38,8 +38,8 @@ public class IntPairSerializer extends TypeSerializer<IntPair> { } @Override - public boolean isStateful() { - return false; + public IntPairSerializer duplicate() { + return this; } @Override @@ -105,12 +105,12 @@ public class IntPairSerializer extends TypeSerializer<IntPair> { public Class<IntPair> getDataType() { return IntPair.class; } - + @Override public int hashCode() { return 42; } - + public boolean equals(Object obj) { return obj.getClass() == IntPairSerializerFactory.class; }; http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java index a38633c..388e8bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java @@ -35,8 +35,8 @@ public class StringPairSerializer extends TypeSerializer<StringPair> { } @Override - public boolean isStateful() { - return false; + public StringPairSerializer duplicate() { + return this; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index cba824a..91fb67a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.commons.lang.SerializationUtils import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase import org.apache.flink.core.memory.{DataOutputView, DataInputView} @@ -29,12 +30,23 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} abstract class CaseClassSerializer[T <: Product]( clazz: Class[T], scalaFieldSerializers: Array[TypeSerializer[_]]) - extends TupleSerializerBase[T](clazz, scalaFieldSerializers) { + extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable { @transient var fields : Array[AnyRef] = _ @transient var instanceCreationFailed : Boolean = false + override def duplicate = { + val result = this.clone().asInstanceOf[CaseClassSerializer[T]] + + // set transient fields to null and make copy of serializers + result.fields = null + result.instanceCreationFailed = false + result.fieldSerializers = fieldSerializers.map(_.duplicate()) + + result + } + def createInstance: T = { if (instanceCreationFailed) { null.asInstanceOf[T] @@ -58,8 +70,6 @@ abstract class CaseClassSerializer[T <: Product]( } } - override def isStateful() = true - def copy(from: T, reuse: T): T = { copy(from) } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index d28e9dd..c14e27a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -28,7 +28,7 @@ class EitherSerializer[A, B, T <: Either[A, B]]( val rightSerializer: TypeSerializer[B]) extends TypeSerializer[T] { - override def isStateful: Boolean = false + override def duplicate: EitherSerializer[A,B,T] = this override def createInstance: T = { Left(null).asInstanceOf[T] http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala index f25dd6c..bd076ed 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala @@ -27,7 +27,7 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} */ class NothingSerializer extends TypeSerializer[Any] { - override def isStateful: Boolean = false + override def duplicate: NothingSerializer = this override def createInstance: Any = { Integer.valueOf(-1)
