Repository: flink Updated Branches: refs/heads/release-0.8 91382bb8c -> 02b6f85fe
http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index 7e9e4e5..b52b711 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]] { - override def isStateful: Boolean = false + override def duplicate: OptionSerializer[A] = this override def createInstance: Option[A] = { None http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala index 40071b7..9f46a1b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala @@ -29,13 +29,25 @@ import scala.collection.generic.CanBuildFrom * Serializer for Scala Collections. */ abstract class TraversableSerializer[T <: TraversableOnce[E], E]( - val elementSerializer: TypeSerializer[E]) - extends TypeSerializer[T] { + var elementSerializer: TypeSerializer[E]) + extends TypeSerializer[T] with Cloneable { def getCbf: CanBuildFrom[T, E, T] @transient var cbf: CanBuildFrom[T, E, T] = getCbf + override def duplicate = { + val duplicateElementSerializer = elementSerializer.duplicate() + if (duplicateElementSerializer == elementSerializer) { + // is not stateful, so return ourselves + this + } else { + val result = this.clone().asInstanceOf[TraversableSerializer[T, E]] + result.elementSerializer = elementSerializer.duplicate() + result + } + } + private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() cbf = getCbf @@ -86,8 +98,6 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E]( } } - override def isStateful: Boolean = false - override def deserialize(source: DataInputView): T = { val len = source.readInt() val builder = cbf() http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala index 530ca7a..b4b4841 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala @@ -29,7 +29,7 @@ import scala.util.{Success, Try, Failure} class TrySerializer[A](val elemSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]] { - override def isStateful: Boolean = false + override def duplicate: TrySerializer[A] = this val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable]) http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java index 822b4f2..751ced3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java @@ -35,11 +35,6 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingl } @Override - public boolean isStateful() { - return false; - } - - @Override public VertexWithAdjacencyList createInstance() { return new VertexWithAdjacencyList(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java index e972cd1..8ff0233 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java @@ -35,11 +35,6 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSin } @Override - public boolean isStateful() { - return false; - } - - @Override public VertexWithRankAndDangling createInstance() { return new VertexWithRankAndDangling(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java index 928d4f4..2c3abcd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java @@ -35,11 +35,6 @@ public final class VertexWithRankSerializer extends TypeSerializerSingleton<Vert } @Override - public boolean isStateful() { - return false; - } - - @Override public VertexWithRank createInstance() { return new VertexWithRank(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b6f85f/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala index d09fe60..73630ff 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala @@ -31,8 +31,8 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.runtime.operators.sort.UnilateralSortMerger -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory -import org.junit.Assert._; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory +import org.junit.Assert._ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable class MassiveCaseClassSortingITCase { @@ -89,7 +89,7 @@ class MassiveCaseClassSortingITCase { sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory[StringTuple](serializer, classOf[StringTuple]), + new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]), comparator, 1.0, 4, 0.8f) val sortedData = sorter.getIterator
