Repository: flink
Updated Branches:
  refs/heads/release-0.8 91382bb8c -> 02b6f85fe


http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 7e9e4e5..b52b711 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.{DataOutputView, 
DataInputView}
 class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   extends TypeSerializer[Option[A]] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: OptionSerializer[A] = this
 
   override def createInstance: Option[A] = {
     None

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 40071b7..9f46a1b 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -29,13 +29,25 @@ import scala.collection.generic.CanBuildFrom
  * Serializer for Scala Collections.
  */
 abstract class TraversableSerializer[T <: TraversableOnce[E], E](
-    val elementSerializer: TypeSerializer[E])
-  extends TypeSerializer[T] {
+    var elementSerializer: TypeSerializer[E])
+  extends TypeSerializer[T] with Cloneable {
 
   def getCbf: CanBuildFrom[T, E, T]
 
   @transient var cbf: CanBuildFrom[T, E, T] = getCbf
 
+  override def duplicate = {
+    val duplicateElementSerializer = elementSerializer.duplicate()
+    if (duplicateElementSerializer == elementSerializer) {
+      // is not stateful, so return ourselves
+      this
+    } else {
+      val result = this.clone().asInstanceOf[TraversableSerializer[T, E]]
+      result.elementSerializer = elementSerializer.duplicate()
+      result
+    }
+  }
+
   private def readObject(in: ObjectInputStream): Unit = {
     in.defaultReadObject()
     cbf = getCbf
@@ -86,8 +98,6 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], 
E](
     }
   }
 
-  override def isStateful: Boolean = false
-
   override def deserialize(source: DataInputView): T = {
     val len = source.readInt()
     val builder = cbf()

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index 530ca7a..b4b4841 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -29,7 +29,7 @@ import scala.util.{Success, Try, Failure}
 class TrySerializer[A](val elemSerializer: TypeSerializer[A])
   extends TypeSerializer[Try[A]] {
 
-  override def isStateful: Boolean = false
+  override def duplicate: TrySerializer[A] = this
 
   val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
index 822b4f2..751ced3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
@@ -35,11 +35,6 @@ public final class VertexWithAdjacencyListSerializer extends 
TypeSerializerSingl
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public VertexWithAdjacencyList createInstance() {
                return new VertexWithAdjacencyList();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
index e972cd1..8ff0233 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
@@ -35,11 +35,6 @@ public final class VertexWithRankAndDanglingSerializer 
extends TypeSerializerSin
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public VertexWithRankAndDangling createInstance() {
                return new VertexWithRankAndDangling();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
index 928d4f4..2c3abcd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
@@ -35,11 +35,6 @@ public final class VertexWithRankSerializer extends 
TypeSerializerSingleton<Vert
        }
 
        @Override
-       public boolean isStateful() {
-               return false;
-       }
-       
-       @Override
        public VertexWithRank createInstance() {
                return new VertexWithRank();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index d09fe60..73630ff 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -31,8 +31,8 @@ import 
org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
-import 
org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory
-import org.junit.Assert._;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory
+import org.junit.Assert._
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 
 class MassiveCaseClassSortingITCase {
@@ -89,7 +89,7 @@ class MassiveCaseClassSortingITCase {
         
         sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, 
inputIterator,
               new DummyInvokable(), 
-              new RuntimeStatelessSerializerFactory[StringTuple](serializer, 
classOf[StringTuple]),
+              new RuntimeSerializerFactory[StringTuple](serializer, 
classOf[StringTuple]),
               comparator, 1.0, 4, 0.8f)
             
         val sortedData = sorter.getIterator

Reply via email to