[FLINK-1463] Fix stateful/stateless Serializers and Comparators

Before, Serializers would announce whether they are stateful or not and
rely on RuntimeStatefulSerializerFactory to do the duplication.
Comparators, on the other hand, had a duplicate method that the user was
required to call.

This commit removes the statful/stateless property from Serializers but
instead introduces a duplicate() method, similar to Comparators, that
can return the same instance.

The two serializer factories are merged into one that always calls
duplicate() before returning a serializer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02b6f85f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02b6f85f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02b6f85f

Branch: refs/heads/release-0.8
Commit: 02b6f85fed8c1409b586e1e934acd72cee54adac
Parents: 91382bb
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Jan 30 16:43:31 2015 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Feb 9 14:20:23 2015 +0100

----------------------------------------------------------------------
 .../api/function/source/FileSourceFunction.java |  10 +-
 .../streamrecord/StreamRecordSerializer.java    |   4 +-
 .../compiler/postpass/JavaApiPostPass.java      |  11 +-
 .../api/common/typeutils/TypeSerializer.java    |  16 +--
 .../typeutils/base/BooleanSerializer.java       |   5 -
 .../typeutils/base/BooleanValueSerializer.java  |   5 -
 .../common/typeutils/base/ByteSerializer.java   |   5 -
 .../typeutils/base/ByteValueSerializer.java     |   5 -
 .../common/typeutils/base/CharSerializer.java   |   5 -
 .../typeutils/base/CharValueSerializer.java     |   5 -
 .../common/typeutils/base/DateSerializer.java   |   5 -
 .../common/typeutils/base/DoubleSerializer.java |   5 -
 .../typeutils/base/DoubleValueSerializer.java   |   5 -
 .../common/typeutils/base/EnumSerializer.java   |   4 +-
 .../common/typeutils/base/FloatSerializer.java  |   5 -
 .../typeutils/base/FloatValueSerializer.java    |   5 -
 .../typeutils/base/GenericArraySerializer.java  |  12 +-
 .../common/typeutils/base/IntSerializer.java    |   5 -
 .../typeutils/base/IntValueSerializer.java      |   5 -
 .../common/typeutils/base/LongSerializer.java   |   5 -
 .../typeutils/base/LongValueSerializer.java     |   5 -
 .../common/typeutils/base/ShortSerializer.java  |   5 -
 .../typeutils/base/ShortValueSerializer.java    |   5 -
 .../common/typeutils/base/StringSerializer.java |   5 -
 .../typeutils/base/StringValueSerializer.java   |   5 -
 .../typeutils/base/TypeSerializerSingleton.java |   7 +-
 .../common/typeutils/base/VoidSerializer.java   |   5 -
 .../array/BooleanPrimitiveArraySerializer.java  |   5 -
 .../array/BytePrimitiveArraySerializer.java     |   5 -
 .../array/CharPrimitiveArraySerializer.java     |   5 -
 .../array/DoublePrimitiveArraySerializer.java   |   5 -
 .../array/FloatPrimitiveArraySerializer.java    |   5 -
 .../base/array/IntPrimitiveArraySerializer.java |   5 -
 .../array/LongPrimitiveArraySerializer.java     |   5 -
 .../array/ShortPrimitiveArraySerializer.java    |   5 -
 .../base/array/StringArraySerializer.java       |   5 -
 .../typeutils/record/RecordSerializer.java      |   5 +-
 .../java/typeutils/runtime/AvroSerializer.java  |   4 +-
 .../runtime/CopyableValueSerializer.java        |   4 +-
 .../runtime/GenericTypeComparator.java          |  14 +-
 .../java/typeutils/runtime/KryoSerializer.java  |   4 +-
 .../java/typeutils/runtime/PojoSerializer.java  |  33 +++--
 .../runtime/RuntimeSerializerFactory.java       | 124 ++++++++++++++++
 .../RuntimeStatefulSerializerFactory.java       | 140 -------------------
 .../RuntimeStatelessSerializerFactory.java      | 120 ----------------
 .../typeutils/runtime/TupleComparatorBase.java  |  28 +---
 .../java/typeutils/runtime/TupleSerializer.java |  20 +++
 .../typeutils/runtime/TupleSerializerBase.java  |  19 +--
 .../java/typeutils/runtime/ValueSerializer.java |   4 +-
 .../typeutils/runtime/WritableSerializer.java   |   4 +-
 .../api/java/io/CollectionInputFormatTest.java  |   4 +-
 .../operators/drivers/TestTaskContext.java      |   6 +-
 .../sort/MassiveStringSortingITCase.java        |   7 +-
 .../sort/MassiveStringValueSortingITCase.java   |   7 +-
 .../testutils/types/IntListSerializer.java      |   4 +-
 .../testutils/types/IntPairSerializer.java      |   8 +-
 .../testutils/types/StringPairSerializer.java   |   4 +-
 .../scala/typeutils/CaseClassSerializer.scala   |  16 ++-
 .../api/scala/typeutils/EitherSerializer.scala  |   2 +-
 .../api/scala/typeutils/NothingSerializer.scala |   2 +-
 .../api/scala/typeutils/OptionSerializer.scala  |   2 +-
 .../scala/typeutils/TraversableSerializer.scala |  18 ++-
 .../api/scala/typeutils/TrySerializer.scala     |   2 +-
 .../VertexWithAdjacencyListSerializer.java      |   5 -
 .../VertexWithRankAndDanglingSerializer.java    |   5 -
 .../types/VertexWithRankSerializer.java         |   5 -
 .../misc/MassiveCaseClassSortingITCase.scala    |   6 +-
 67 files changed, 268 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 5dfe4b2..a5ef3a7 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -49,12 +48,7 @@ public class FileSourceFunction extends 
RichSourceFunction<String> {
        private static TypeSerializerFactory<String> 
createSerializer(TypeInformation<String> typeInfo) {
                TypeSerializer<String> serializer = typeInfo.createSerializer();
 
-               if (serializer.isStateful()) {
-                       return new 
RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
-               } else {
-                       return new 
RuntimeStatelessSerializerFactory<String>(serializer,
-                                       typeInfo.getTypeClass());
-               }
+               return new RuntimeSerializerFactory<String>(serializer, 
typeInfo.getTypeClass());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 85faa9e..98f12ec 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -47,8 +47,8 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public StreamRecordSerializer<T> duplicate() {
+               return this;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index 2273461..e1b5bf8 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -43,8 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
 import 
org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.CompilerPostPassException;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
@@ -278,12 +277,8 @@ public class JavaApiPostPass implements OptimizerPostPass {
        
        private static <T> TypeSerializerFactory<?> 
createSerializer(TypeInformation<T> typeInfo) {
                TypeSerializer<T> serializer = typeInfo.createSerializer();
-               
-               if (serializer.isStateful()) {
-                       return new 
RuntimeStatefulSerializerFactory<T>(serializer, typeInfo.getTypeClass());
-               } else {
-                       return new 
RuntimeStatelessSerializerFactory<T>(serializer, typeInfo.getTypeClass());
-               }
+
+               return new RuntimeSerializerFactory<T>(serializer, 
typeInfo.getTypeClass());
        }
        
        private static <T> TypeComparatorFactory<?> 
createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] 
sortOrder) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 5e32c86..329e826 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -50,17 +50,15 @@ public abstract class TypeSerializer<T> implements 
Serializable {
         */
        public abstract boolean isImmutableType();
        
-       
        /**
-        * Gets whether the serializer is stateful. Statefulness means in this 
context that some of the serializer's
-        * methods have objects with state and are thus not inherently 
thread-safe. A stateful serializer might be used by
-        * multiple threads concurrently. For a stateful one, different 
instances will be used by different threads.
-        * 
-        * @return True, if the serializer is stateful, false if it is 
stateless;
+        * Creates a deep copy of this serializer if it is necessary, i.e. if 
it is stateful. This
+        * can return itself if the serializer is not stateful.
+        *
+        * We need this because Serializers might be used in several threads. 
Stateless serializers
+        * are inherently thread-safe while stateful serializers might not be 
thread-safe.
         */
-       public abstract boolean isStateful();
-       
-       
+       public abstract TypeSerializer<T> duplicate();
+
        // 
--------------------------------------------------------------------------------------------
        // Instantiation & Cloning
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index ecfb3c2..a844ac8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -37,11 +37,6 @@ public final class BooleanSerializer extends 
TypeSerializerSingleton<Boolean> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public Boolean createInstance() {
                return FALSE;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index 4795055..3aae95d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -38,11 +38,6 @@ public final class BooleanValueSerializer extends 
TypeSerializerSingleton<Boolea
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public BooleanValue createInstance() {
                return new BooleanValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index 32f3edd..92b3685 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -39,11 +39,6 @@ public final class ByteSerializer extends 
TypeSerializerSingleton<Byte> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public Byte createInstance() {
                return ZERO;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index 24cc98b..e523d5e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -38,11 +38,6 @@ public final class ByteValueSerializer extends 
TypeSerializerSingleton<ByteValue
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public ByteValue createInstance() {
                return new ByteValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index c46d3a0..181db56 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -39,11 +39,6 @@ public final class CharSerializer extends 
TypeSerializerSingleton<Character> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public Character createInstance() {
                return ZERO;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 71a8ef4..690509c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -37,11 +37,6 @@ public class CharValueSerializer extends 
TypeSerializerSingleton<CharValue> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public CharValue createInstance() {
                return new CharValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 4bd2ea8..6aa11eb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -37,11 +37,6 @@ public final class DateSerializer extends 
TypeSerializerSingleton<Date> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public Date createInstance() {
                return new Date();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 8e09f7c..24af95c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -38,11 +38,6 @@ public final class DoubleSerializer extends 
TypeSerializerSingleton<Double> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-
-       @Override
        public Double createInstance() {
                return ZERO;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index f4c7f37..34434f1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -38,11 +38,6 @@ public final class DoubleValueSerializer extends 
TypeSerializerSingleton<DoubleV
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public DoubleValue createInstance() {
                return new DoubleValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index 7ecf82a..643e4fa 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -46,8 +46,8 @@ public final class EnumSerializer<T extends Enum<T>> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public EnumSerializer<T> duplicate() {
+               return this;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index b1a46b0..c823783 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -38,11 +38,6 @@ public final class FloatSerializer extends 
TypeSerializerSingleton<Float> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-
-       @Override
        public Float createInstance() {
                return ZERO;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 6ebb268..15d00b5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -38,11 +38,6 @@ public class FloatValueSerializer extends 
TypeSerializerSingleton<FloatValue> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public FloatValue createInstance() {
                return new FloatValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index c72132d..355abd0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -50,15 +50,21 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
                this.componentSerializer = componentSerializer;
                this.EMPTY = create(0);
        }
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
        }
 
        @Override
-       public boolean isStateful() {
-               return this.componentSerializer.isStateful();
+       public GenericArraySerializer<C> duplicate() {
+               TypeSerializer<C> duplicateComponentSerializer = 
this.componentSerializer.duplicate();
+               if (duplicateComponentSerializer == this.componentSerializer) {
+                       // is not stateful, return ourselves
+                       return this;
+               } else {
+                       return new GenericArraySerializer<C>(componentClass, 
duplicateComponentSerializer);
+               }
        }
 
        

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 2937b2a..778f044 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -39,11 +39,6 @@ public final class IntSerializer extends 
TypeSerializerSingleton<Integer> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public Integer createInstance() {
                return ZERO;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index ec1f345..c2d1b60 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -38,11 +38,6 @@ public final class IntValueSerializer extends 
TypeSerializerSingleton<IntValue>
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public IntValue createInstance() {
                return new IntValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 6b25596..6d8b758 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -39,11 +39,6 @@ public final class LongSerializer extends 
TypeSerializerSingleton<Long> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public Long createInstance() {
                return ZERO;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index 95caf04..37dec40 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -38,11 +38,6 @@ public final class LongValueSerializer extends 
TypeSerializerSingleton<LongValue
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public LongValue createInstance() {
                return new LongValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index c6e7870..44e5e3e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -39,11 +39,6 @@ public final class ShortSerializer extends 
TypeSerializerSingleton<Short> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-
-       @Override
        public Short createInstance() {
                return ZERO;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index ab58987..1dbe4a5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -38,11 +38,6 @@ public final class ShortValueSerializer extends 
TypeSerializerSingleton<ShortVal
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public ShortValue createInstance() {
                return new ShortValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index 71221a2..7b26600 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -39,11 +39,6 @@ public final class StringSerializer extends 
TypeSerializerSingleton<String> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-
-       @Override
        public String createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index c5d5b55..7628cab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -40,11 +40,6 @@ public final class StringValueSerializer extends 
TypeSerializerSingleton<StringV
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public StringValue createInstance() {
                return new StringValue();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 979d5ab..e076e5b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -25,7 +25,12 @@ public abstract class TypeSerializerSingleton<T> extends 
TypeSerializer<T>{
        private static final long serialVersionUID = 8766687317209282373L;
 
        // 
--------------------------------------------------------------------------------------------
-       
+
+       @Override
+       public TypeSerializerSingleton<T> duplicate() {
+               return this;
+       }
+
        @Override
        public int hashCode() {
                return super.hashCode();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
index 33bb901..272ffbd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -35,11 +35,6 @@ public final class VoidSerializer extends 
TypeSerializerSingleton<Void> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public Void createInstance() {
                return null;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index e9941a8..4a493ac 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -40,11 +40,6 @@ public final class BooleanPrimitiveArraySerializer extends 
TypeSerializerSinglet
        public boolean isImmutableType() {
                return false;
        }
-
-       @Override
-       public boolean isStateful() {
-               return false;
-       }
        
        @Override
        public boolean[] createInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index aaf867f..fb4d506 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -41,11 +41,6 @@ public final class BytePrimitiveArraySerializer extends 
TypeSerializerSingleton<
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public byte[] createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index 64632bd..8e3c4ea 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class CharPrimitiveArraySerializer extends 
TypeSerializerSingleton<
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public char[] createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 846ae74..10e25c2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class DoublePrimitiveArraySerializer extends 
TypeSerializerSingleto
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public double[] createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index 8f42ac8..d57af00 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class FloatPrimitiveArraySerializer extends 
TypeSerializerSingleton
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public float[] createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index 2ab056c..eaff287 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public class IntPrimitiveArraySerializer extends 
TypeSerializerSingleton<int[]>{
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public int[] createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 5d34dfe..55a22c2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -42,11 +42,6 @@ public final class LongPrimitiveArraySerializer extends 
TypeSerializerSingleton<
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public long[] createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 2f37033..08275b0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -40,11 +40,6 @@ public final class ShortPrimitiveArraySerializer extends 
TypeSerializerSingleton
        public boolean isImmutableType() {
                return false;
        }
-
-       @Override
-       public boolean isStateful() {
-               return false;
-       }
        
        @Override
        public short[] createInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index d5ab030..ad172a8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -44,11 +44,6 @@ public final class StringArraySerializer extends 
TypeSerializerSingleton<String[
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public String[] createInstance() {
                return EMPTY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
index 7b72e89..11b21d6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
@@ -57,8 +57,9 @@ public final class RecordSerializer extends 
TypeSerializer<Record> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public RecordSerializer duplicate() {
+               // does not hold state, so just return ourselves
+               return this;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index cc72fa3..2758bd6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -79,8 +79,8 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public boolean isStateful() {
-               return true;
+       public AvroSerializer duplicate() {
+               return new AvroSerializer(type, typeToInstantiate);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 8710f2d..193d495 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -47,8 +47,8 @@ public class CopyableValueSerializer<T extends 
CopyableValue<T>> extends TypeSer
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public CopyableValueSerializer<T> duplicate() {
+               return this;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
index 7caa770..039cef7 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
@@ -42,9 +41,7 @@ public class GenericTypeComparator<T extends Comparable<T>> 
extends TypeComparat
 
        private final Class<T> type;
 
-       private final TypeSerializerFactory<T> serializerFactory;
-
-       private transient TypeSerializer<T> serializer;
+       private TypeSerializer<T> serializer;
 
        private transient T reference;
 
@@ -61,15 +58,11 @@ public class GenericTypeComparator<T extends Comparable<T>> 
extends TypeComparat
                this.ascending = ascending;
                this.serializer = serializer;
                this.type = type;
-
-               this.serializerFactory = this.serializer.isStateful()
-                               ? new 
RuntimeStatefulSerializerFactory<T>(this.serializer, this.type)
-                               : new 
RuntimeStatelessSerializerFactory<T>(this.serializer, this.type);
        }
 
        private GenericTypeComparator(GenericTypeComparator<T> toClone) {
                this.ascending = toClone.ascending;
-               this.serializerFactory = toClone.serializerFactory;
+               this.serializer = toClone.serializer.duplicate();
                this.type = toClone.type;
        }
 
@@ -104,9 +97,6 @@ public class GenericTypeComparator<T extends Comparable<T>> 
extends TypeComparat
 
        @Override
        public int compareSerialized(final DataInputView firstSource, final 
DataInputView secondSource) throws IOException {
-               if (this.serializer == null) {
-                       this.serializer = 
this.serializerFactory.getSerializer();
-               }
 
                if (this.reference == null) {
                        this.reference = this.serializer.createInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 3a51534..a5f5f65 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -63,8 +63,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        }
 
        @Override
-       public boolean isStateful() {
-               return true;
+       public KryoSerializer<T> duplicate() {
+               return new KryoSerializer<T>(this.type);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 99b9f65..7bd3243 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -42,9 +42,6 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
        private final int numFields;
 
-       private final boolean stateful;
-
-
        @SuppressWarnings("unchecked")
        public PojoSerializer(Class<T> clazz, TypeSerializer<?>[] 
fieldSerializers, Field[] fields) {
                this.clazz = clazz;
@@ -55,15 +52,6 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                for (int i = 0; i < numFields; i++) {
                        this.fields[i].setAccessible(true);
                }
-
-               boolean stateful = false;
-               for (TypeSerializer<?> ser : fieldSerializers) {
-                       if (ser.isStateful()) {
-                               stateful = true;
-                               break;
-                       }
-               }
-               this.stateful = stateful;
        }
 
        private void writeObject(ObjectOutputStream out)
@@ -109,10 +97,25 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public boolean isStateful() {
-               return this.stateful;
+       public PojoSerializer<T> duplicate() {
+               boolean stateful = false;
+               TypeSerializer[] duplicateFieldSerializers = new 
TypeSerializer[fieldSerializers.length];
+
+               for (int i = 0; i < fieldSerializers.length; i++) {
+                       duplicateFieldSerializers[i] = 
fieldSerializers[i].duplicate();
+                       if (duplicateFieldSerializers[i] != 
fieldSerializers[i]) {
+                               // at least one of them is stateful
+                               stateful = true;
+                       }
+               }
+
+               if (stateful) {
+                       return new PojoSerializer<T>(clazz, 
duplicateFieldSerializers, fields);
+               } else {
+                       return this;
+               }
        }
-       
+
        
        @Override
        public T createInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
new file mode 100644
index 0000000..96aff73
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class RuntimeSerializerFactory<T> implements 
TypeSerializerFactory<T>, java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+       
+
+       private static final String CONFIG_KEY_SER = "SER_DATA";
+
+       private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
+
+       
+       private TypeSerializer<T> serializer;
+
+       private boolean firstSerializer = true;
+
+       private Class<T> clazz;
+
+       // Because we read the class from the TaskConfig and instantiate 
ourselves
+       public RuntimeSerializerFactory() {}
+
+       public RuntimeSerializerFactory(TypeSerializer<T> serializer, Class<T> 
clazz) {
+               if (serializer == null || clazz == null) {
+                       throw new NullPointerException();
+               }
+
+               this.clazz = clazz;
+               this.serializer = serializer;
+       }
+
+
+       @Override
+       public void writeParametersToConfig(Configuration config) {
+               try {
+                       InstantiationUtil.writeObjectToConfig(clazz, config, 
CONFIG_KEY_CLASS);
+                       InstantiationUtil.writeObjectToConfig(serializer, 
config, CONFIG_KEY_SER);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not serialize 
serializer into the configuration.", e);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void readParametersFromConfig(Configuration config, ClassLoader 
cl) throws ClassNotFoundException {
+               if (config == null || cl == null) {
+                       throw new NullPointerException();
+               }
+               
+               try {
+                       this.clazz = (Class<T>) 
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
+                       this.serializer = (TypeSerializer<T>)  
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
+                       firstSerializer = true;
+               }
+               catch (ClassNotFoundException e) {
+                       throw e;
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not load deserializer 
from the configuration.", e);
+               }
+       }
+
+       @Override
+       public TypeSerializer<T> getSerializer() {
+               if (this.serializer != null) {
+                       if (firstSerializer) {
+                               firstSerializer = false;
+                               return this.serializer;
+                       } else {
+                               return this.serializer.duplicate();
+                       }
+               } else {
+                       throw new RuntimeException("SerializerFactory has not 
been initialized from configuration.");
+               }
+       }
+
+       @Override
+       public Class<T> getDataType() {
+               return clazz;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public int hashCode() {
+               return clazz.hashCode() ^ serializer.hashCode();
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj != null && obj instanceof RuntimeSerializerFactory) {
+                       RuntimeSerializerFactory<?> other = 
(RuntimeSerializerFactory<?>) obj;
+                       
+                       return this.clazz == other.clazz &&
+                                       
this.serializer.equals(other.serializer);
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
deleted file mode 100644
index 19cd3b7..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeStatefulSerializerFactory<T> implements 
TypeSerializerFactory<T>, java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final String CONFIG_KEY_SER = "SER_DATA";
-
-       private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
-
-       private byte[] serializerData;
-       
-       private TypeSerializer<T> serializer;           // only for equality 
comparisons
-       
-       private transient ClassLoader loader;
-
-       private Class<T> clazz;
-
-       public RuntimeStatefulSerializerFactory() {}
-
-       public RuntimeStatefulSerializerFactory(TypeSerializer<T> serializer, 
Class<T> clazz) {
-               this.clazz = clazz;
-               this.loader = serializer.getClass().getClassLoader();
-               
-               try {
-                       this.serializerData = 
InstantiationUtil.serializeObject(serializer);
-               } catch (IOException e) {
-                       throw new RuntimeException("Cannt serialize the 
Serializer.", e);
-               }
-       }
-
-
-       @Override
-       public void writeParametersToConfig(Configuration config) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(clazz, config, 
CONFIG_KEY_CLASS);
-                       config.setBytes(CONFIG_KEY_SER, this.serializerData);
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not serialize 
serializer into the configuration.", e);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void readParametersFromConfig(Configuration config, ClassLoader 
cl) throws ClassNotFoundException {
-               if (config == null || cl == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.serializerData = config.getBytes(CONFIG_KEY_SER, null);
-               if (this.serializerData == null) {
-                       throw new RuntimeException("Could not find deserializer 
in the configuration."); 
-               }
-               
-               this.loader = cl;
-               
-               try {
-                       this.clazz = (Class<T>) 
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
-               }
-               catch (ClassNotFoundException e) {
-                       throw e;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not load deserializer 
from the configuration.", e);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeSerializer<T> getSerializer() {
-               if (serializerData != null) {
-                       try {
-                               return (TypeSerializer<T>) 
InstantiationUtil.deserializeObject(this.serializerData, this.loader);
-                       } catch (Exception e) {
-                               throw new RuntimeException("Repeated 
instantiation of serializer failed.", e);
-                       }
-               } else {
-                       throw new RuntimeException("SerializerFactory has not 
been initialized from configuration.");
-               }
-       }
-
-       @Override
-       public Class<T> getDataType() {
-               return this.clazz;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return clazz.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj != null && obj instanceof 
RuntimeStatefulSerializerFactory) {
-                       @SuppressWarnings("unchecked")
-                       RuntimeStatefulSerializerFactory<T> other = 
(RuntimeStatefulSerializerFactory<T>) obj;
-                       
-                       if (this.serializer == null) {
-                               this.serializer = getSerializer();
-                       }
-                       if (other.serializer == null) {
-                               other.serializer = other.getSerializer();
-                       }
-                       
-                       return this.clazz == other.clazz &&
-                                       
this.serializer.equals(other.serializer);
-               } else {
-                       return false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java
deleted file mode 100644
index 041b824..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class RuntimeStatelessSerializerFactory<T> implements 
TypeSerializerFactory<T>, java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-       
-
-       private static final String CONFIG_KEY_SER = "SER_DATA";
-
-       private static final String CONFIG_KEY_CLASS = "CLASS_DATA";
-
-       
-       private TypeSerializer<T> serializer;
-
-       private Class<T> clazz;
-
-
-       public RuntimeStatelessSerializerFactory() {}
-
-       public RuntimeStatelessSerializerFactory(TypeSerializer<T> serializer, 
Class<T> clazz) {
-               if (serializer == null || clazz == null) {
-                       throw new NullPointerException();
-               }
-               
-               if (serializer.isStateful()) {
-                       throw new IllegalArgumentException("Cannot use the 
stateless serializer factory with a stateful serializer.");
-               }
-               
-               this.clazz = clazz;
-               this.serializer = serializer;
-       }
-
-
-       @Override
-       public void writeParametersToConfig(Configuration config) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(clazz, config, 
CONFIG_KEY_CLASS);
-                       InstantiationUtil.writeObjectToConfig(serializer, 
config, CONFIG_KEY_SER);
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not serialize 
serializer into the configuration.", e);
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void readParametersFromConfig(Configuration config, ClassLoader 
cl) throws ClassNotFoundException {
-               if (config == null || cl == null) {
-                       throw new NullPointerException();
-               }
-               
-               try {
-                       this.clazz = (Class<T>) 
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
-                       this.serializer = (TypeSerializer<T>)  
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
-               }
-               catch (ClassNotFoundException e) {
-                       throw e;
-               }
-               catch (Exception e) {
-                       throw new RuntimeException("Could not load deserializer 
from the configuration.", e);
-               }
-       }
-
-       @Override
-       public TypeSerializer<T> getSerializer() {
-               if (this.serializer != null) {
-                       return this.serializer;
-               } else {
-                       throw new RuntimeException("SerializerFactory has not 
been initialized from configuration.");
-               }
-       }
-
-       @Override
-       public Class<T> getDataType() {
-               return clazz;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return clazz.hashCode() ^ serializer.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj != null && obj instanceof 
RuntimeStatelessSerializerFactory) {
-                       RuntimeStatelessSerializerFactory<?> other = 
(RuntimeStatelessSerializerFactory<?>) obj;
-                       
-                       return this.clazz == other.clazz &&
-                                       
this.serializer.equals(other.serializer);
-               } else {
-                       return false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
index abcf89c..28169e5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.KeyFieldOutOfBoundsException;
@@ -41,10 +40,6 @@ public abstract class TupleComparatorBase<T> extends 
CompositeTypeComparator<T>
        @SuppressWarnings("rawtypes")
        protected TypeComparator[] comparators;
 
-       /** serializer factories to duplicate non thread-safe serializers */
-       protected TypeSerializerFactory<Object>[] serializerFactories;
-
-
        protected int[] normalizedKeyLengths;
 
        protected int numLeadingNormalizableKeys;
@@ -56,7 +51,7 @@ public abstract class TupleComparatorBase<T> extends 
CompositeTypeComparator<T>
 
        /** serializers to deserialize the first n fields for comparison */
        @SuppressWarnings("rawtypes")
-       protected transient TypeSerializer[] serializers;
+       protected TypeSerializer[] serializers;
 
        // cache for the deserialized field objects
        protected transient Object[] deserializedFields1;
@@ -70,14 +65,6 @@ public abstract class TupleComparatorBase<T> extends 
CompositeTypeComparator<T>
                this.comparators = (TypeComparator<Object>[]) comparators;
                this.serializers = (TypeSerializer<Object>[]) serializers;
 
-               // set the serializer factories.
-               this.serializerFactories = new 
TypeSerializerFactory[this.serializers.length];
-               for (int i = 0; i < serializers.length; i++) {
-                       this.serializerFactories[i] = 
this.serializers[i].isStateful() ?
-                                       new 
RuntimeStatefulSerializerFactory<Object>(this.serializers[i], Object.class) :
-                                       new 
RuntimeStatelessSerializerFactory<Object>(this.serializers[i], Object.class);
-               }
-
                // set up auxiliary fields for normalized key support
                this.normalizedKeyLengths = new int[keyPositions.length];
                int nKeys = 0;
@@ -129,7 +116,11 @@ public abstract class TupleComparatorBase<T> extends 
CompositeTypeComparator<T>
        protected void privateDuplicate(TupleComparatorBase<T> toClone) {
                // copy fields and serializer factories
                this.keyPositions = toClone.keyPositions;
-               this.serializerFactories = toClone.serializerFactories;
+
+               this.serializers = new 
TypeSerializer[toClone.serializers.length];
+               for (int i = 0; i < toClone.serializers.length; i++) {
+                       this.serializers[i] = 
toClone.serializers[i].duplicate();
+               }
 
                this.comparators = new 
TypeComparator[toClone.comparators.length];
                for (int i = 0; i < toClone.comparators.length; i++) {
@@ -261,13 +252,6 @@ public abstract class TupleComparatorBase<T> extends 
CompositeTypeComparator<T>
        // 
--------------------------------------------------------------------------------------------
        
        protected final void instantiateDeserializationUtils() {
-               if (this.serializers == null) {
-                       this.serializers = new 
TypeSerializer[this.serializerFactories.length];
-                       for (int i = 0; i < this.serializers.length; i++) {
-                               this.serializers[i] = 
this.serializerFactories[i].getSerializer();
-                       }
-               }
-               
                this.deserializedFields1 = new Object[this.serializers.length];
                this.deserializedFields2 = new Object[this.serializers.length];
                

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index ae429a7..9071446 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -36,6 +36,26 @@ public final class TupleSerializer<T extends Tuple> extends 
TupleSerializerBase<
        }
 
        @Override
+       public TupleSerializer<T> duplicate() {
+               boolean stateful = false;
+               TypeSerializer[] duplicateFieldSerializers = new 
TypeSerializer[fieldSerializers.length];
+
+               for (int i = 0; i < fieldSerializers.length; i++) {
+                       duplicateFieldSerializers[i] = 
fieldSerializers[i].duplicate();
+                       if (duplicateFieldSerializers[i] != 
fieldSerializers[i]) {
+                               // at least one of them is stateful
+                               stateful = true;
+                       }
+               }
+
+               if (stateful) {
+                       return new TupleSerializer<T>(tupleClass, 
duplicateFieldSerializers);
+               } else {
+                       return this;
+               }
+       }
+
+       @Override
        public T createInstance() {
                try {
                        T t = tupleClass.newInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 08df7d3..afc99d0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -32,27 +32,15 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
 
        protected final Class<T> tupleClass;
 
-       protected final TypeSerializer<Object>[] fieldSerializers;
+       protected TypeSerializer<Object>[] fieldSerializers;
 
        protected final int arity;
 
-       protected final boolean stateful;
-
-
        @SuppressWarnings("unchecked")
        public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers) {
                this.tupleClass = tupleClass;
                this.fieldSerializers = (TypeSerializer<Object>[]) 
fieldSerializers;
                this.arity = fieldSerializers.length;
-               
-               boolean stateful = false;
-               for (TypeSerializer<?> ser : fieldSerializers) {
-                       if (ser.isStateful()) {
-                               stateful = true;
-                               break;
-                       }
-               }
-               this.stateful = stateful;
        }
        
        
@@ -62,11 +50,6 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public boolean isStateful() {
-               return this.stateful;
-       }
-
-       @Override
        public int getLength() {
                return -1;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index d6c35cb..ad1b0f0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -62,8 +62,8 @@ public class ValueSerializer<T extends Value> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public boolean isStateful() {
-               return true;
+       public ValueSerializer<T> duplicate() {
+               return new ValueSerializer<T>(type);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index c89733e..777122e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -98,8 +98,8 @@ public class WritableSerializer<T extends Writable> extends 
TypeSerializer<T> {
        }
        
        @Override
-       public boolean isStateful() {
-               return true;
+       public WritableSerializer duplicate() {
+               return new WritableSerializer(typeClass);
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 64dae22..118e707 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -278,8 +278,8 @@ public class CollectionInputFormatTest {
                }
 
                @Override
-               public boolean isStateful() {
-                       return false;
+               public TestSerializer duplicate() {
+                       return this;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index e5ece3f..09ef9ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.drivers;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -86,13 +86,13 @@ public class TestTaskContext<S, T> implements 
PactTaskContext<S, T> {
        @SuppressWarnings("unchecked")
        public <X> void setInput1(MutableObjectIterator<X> input, 
TypeSerializer<X> serializer) {
                this.input1 = input;
-               this.serializer1 = new 
RuntimeStatefulSerializerFactory<X>(serializer, (Class<X>) 
serializer.createInstance().getClass());
+               this.serializer1 = new RuntimeSerializerFactory<X>(serializer, 
(Class<X>) serializer.createInstance().getClass());
        }
 
        @SuppressWarnings("unchecked")
        public <X> void setInput2(MutableObjectIterator<X> input, 
TypeSerializer<X> serializer) {
                this.input2 = input;
-               this.serializer2 = new 
RuntimeStatefulSerializerFactory<X>(serializer, (Class<X>) 
serializer.createInstance().getClass());
+               this.serializer2 = new RuntimeSerializerFactory<X>(serializer, 
(Class<X>) serializer.createInstance().getClass());
        }
        
        public void setComparator1(TypeComparator<?> comparator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 9dec847..63ca948 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -34,12 +34,11 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Assert;
@@ -95,7 +94,7 @@ public class MassiveStringSortingITCase {
                                MutableObjectIterator<String> inputIterator = 
new StringReaderMutableObjectIterator(reader);
                                
                                sorter = new UnilateralSortMerger<String>(mm, 
ioMan, inputIterator, new DummyInvokable(),
-                                               new 
RuntimeStatelessSerializerFactory<String>(serializer, String.class), 
comparator, 1.0, 4, 0.8f);
+                                               new 
RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 
0.8f);
 
                                MutableObjectIterator<String> sortedData = 
sorter.getIterator();
                                
@@ -187,7 +186,7 @@ public class MassiveStringSortingITCase {
                                MutableObjectIterator<Tuple2<String, String[]>> 
inputIterator = new StringTupleReaderMutableObjectIterator(reader);
                                
                                sorter = new 
UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new 
DummyInvokable(),
-                                               new 
RuntimeStatelessSerializerFactory<Tuple2<String, String[]>>(serializer, 
(Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 
0.8f);
+                                               new 
RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, 
(Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 
0.8f);
 
                                
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
index f2a3fc7..8368c61 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java
@@ -33,12 +33,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
 import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
@@ -91,7 +90,7 @@ public class MassiveStringValueSortingITCase {
                                MutableObjectIterator<StringValue> 
inputIterator = new StringValueReaderMutableObjectIterator(reader);
                                
                                sorter = new 
UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new 
DummyInvokable(),
-                                               new 
RuntimeStatelessSerializerFactory<StringValue>(serializer, StringValue.class), 
comparator, 1.0, 4, 0.8f);
+                                               new 
RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), 
comparator, 1.0, 4, 0.8f);
 
                                MutableObjectIterator<StringValue> sortedData = 
sorter.getIterator();
                                
@@ -185,7 +184,7 @@ public class MassiveStringValueSortingITCase {
                                MutableObjectIterator<Tuple2<StringValue, 
StringValue[]>> inputIterator = new 
StringValueTupleReaderMutableObjectIterator(reader);
                                
                                sorter = new 
UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, 
inputIterator, new DummyInvokable(),
-                                               new 
RuntimeStatelessSerializerFactory<Tuple2<StringValue, 
StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) 
(Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+                                               new 
RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, 
(Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), 
comparator, 1.0, 4, 0.8f);
 
                                
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 2134bcd..69dfeb9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -35,8 +35,8 @@ public class IntListSerializer extends 
TypeSerializer<IntList> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public IntListSerializer duplicate() {
+               return this;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index 361585d..c2571cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -38,8 +38,8 @@ public class IntPairSerializer extends 
TypeSerializer<IntPair> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public IntPairSerializer duplicate() {
+               return this;
        }
 
        @Override
@@ -105,12 +105,12 @@ public class IntPairSerializer extends 
TypeSerializer<IntPair> {
                public Class<IntPair> getDataType() {
                        return IntPair.class;
                }
-               
+
                @Override
                public int hashCode() {
                        return 42;
                }
-               
+
                public boolean equals(Object obj) {
                        return obj.getClass() == IntPairSerializerFactory.class;
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index a38633c..388e8bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -35,8 +35,8 @@ public class StringPairSerializer extends 
TypeSerializer<StringPair> {
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
+       public StringPairSerializer duplicate() {
+               return this;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index cba824a..91fb67a 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.commons.lang.SerializationUtils
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
@@ -29,12 +30,23 @@ import org.apache.flink.core.memory.{DataOutputView, 
DataInputView}
 abstract class CaseClassSerializer[T <: Product](
     clazz: Class[T],
     scalaFieldSerializers: Array[TypeSerializer[_]])
-  extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {
+  extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable {
 
   @transient var fields : Array[AnyRef] = _
 
   @transient var instanceCreationFailed : Boolean = false
 
+  override def duplicate = {
+    val result = this.clone().asInstanceOf[CaseClassSerializer[T]]
+
+    // set transient fields to null and make copy of serializers
+    result.fields = null
+    result.instanceCreationFailed = false
+    result.fieldSerializers = fieldSerializers.map(_.duplicate())
+
+    result
+  }
+
   def createInstance: T = {
     if (instanceCreationFailed) {
       null.asInstanceOf[T]
@@ -58,8 +70,6 @@ abstract class CaseClassSerializer[T <: Product](
     }
   }
 
-  override def isStateful() = true
-
   def copy(from: T, reuse: T): T = {
     copy(from)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index d28e9dd..c14e27a 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -28,7 +28,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](
     val rightSerializer: TypeSerializer[B])
   extends TypeSerializer[T] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: EitherSerializer[A,B,T] = this
 
   override def createInstance: T = {
     Left(null).asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index f25dd6c..bd076ed 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -27,7 +27,7 @@ import org.apache.flink.core.memory.{DataOutputView, 
DataInputView}
  */
 class NothingSerializer extends TypeSerializer[Any] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: NothingSerializer = this
 
   override def createInstance: Any = {
     Integer.valueOf(-1)

Reply via email to