This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4248de26c3500f86596c2d06c91a9c76c576f3cd Author: klion26 <[email protected]> AuthorDate: Thu Apr 30 15:33:02 2020 +0800 [FLINK-13632] Port TraversableSerializer upgrade test to TypeSerializerUpgradeTestBase --- .../flink-1.6-traversable-serializer-bitset-data | Bin 140 -> 0 bytes ...link-1.6-traversable-serializer-bitset-snapshot | Bin 992 -> 0 bytes ...link-1.6-traversable-serializer-indexedseq-data | Bin 130 -> 0 bytes ...-1.6-traversable-serializer-indexedseq-snapshot | Bin 992 -> 0 bytes ...flink-1.6-traversable-serializer-linearseq-data | Bin 130 -> 0 bytes ...k-1.6-traversable-serializer-linearseq-snapshot | Bin 992 -> 0 bytes .../flink-1.6-traversable-serializer-map-data | Bin 138 -> 0 bytes .../flink-1.6-traversable-serializer-map-snapshot | Bin 3033 -> 0 bytes ...nk-1.6-traversable-serializer-mutable-list-data | Bin 160 -> 0 bytes ....6-traversable-serializer-mutable-list-snapshot | Bin 993 -> 0 bytes .../flink-1.6-traversable-serializer-seq-data | Bin 130 -> 0 bytes .../flink-1.6-traversable-serializer-seq-snapshot | Bin 992 -> 0 bytes .../flink-1.6-traversable-serializer-set-data | Bin 135 -> 0 bytes .../flink-1.6-traversable-serializer-set-snapshot | Bin 992 -> 0 bytes ...1.6-traversable-serializer-with-case-class-data | Bin 145 -> 0 bytes ...traversable-serializer-with-case-class-snapshot | Bin 3036 -> 0 bytes ...flink-1.6-traversable-serializer-with-pojo-data | Bin 253 -> 0 bytes ...k-1.6-traversable-serializer-with-pojo-snapshot | Bin 6854 -> 0 bytes .../flink-1.7-traversable-serializer-bitset-data | Bin 140 -> 0 bytes ...link-1.7-traversable-serializer-bitset-snapshot | Bin 987 -> 0 bytes ...link-1.7-traversable-serializer-indexedseq-data | Bin 130 -> 0 bytes ...-1.7-traversable-serializer-indexedseq-snapshot | Bin 987 -> 0 bytes ...flink-1.7-traversable-serializer-linearseq-data | Bin 130 -> 0 bytes ...k-1.7-traversable-serializer-linearseq-snapshot | Bin 987 -> 0 bytes .../flink-1.7-traversable-serializer-map-data | Bin 138 -> 0 bytes .../flink-1.7-traversable-serializer-map-snapshot | Bin 3905 -> 0 bytes ...nk-1.7-traversable-serializer-mutable-list-data | Bin 160 -> 0 bytes ....7-traversable-serializer-mutable-list-snapshot | Bin 988 -> 0 bytes .../flink-1.7-traversable-serializer-seq-data | Bin 130 -> 0 bytes .../flink-1.7-traversable-serializer-seq-snapshot | Bin 987 -> 0 bytes .../flink-1.7-traversable-serializer-set-data | Bin 135 -> 0 bytes .../flink-1.7-traversable-serializer-set-snapshot | Bin 987 -> 0 bytes ...1.7-traversable-serializer-with-case-class-data | Bin 145 -> 0 bytes ...traversable-serializer-with-case-class-snapshot | Bin 3909 -> 0 bytes ...flink-1.7-traversable-serializer-with-pojo-data | Bin 253 -> 0 bytes ...k-1.7-traversable-serializer-with-pojo-snapshot | Bin 9607 -> 0 bytes ...raversableSerializerSnapshotMigrationTest.scala | 118 -------- .../TraversableSerializerUpgradeTest.scala | 326 +++++++++++++++++++++ 38 files changed, 326 insertions(+), 118 deletions(-) diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-data deleted file mode 100644 index d6c225f..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-snapshot deleted file mode 100644 index fd9e834..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-data deleted file mode 100644 index 328992e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-snapshot deleted file mode 100644 index 9c51467..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-data deleted file mode 100644 index 328992e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-snapshot deleted file mode 100644 index 46ceea1..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-data deleted file mode 100644 index bf82319..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-snapshot deleted file mode 100644 index d8b191e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-data deleted file mode 100644 index ca814e2..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-snapshot deleted file mode 100644 index 7e4befd..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-data deleted file mode 100644 index 328992e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-snapshot deleted file mode 100644 index db73915..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-data deleted file mode 100644 index 2380dd3..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-snapshot deleted file mode 100644 index e5f86d1..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-data deleted file mode 100644 index fc90039..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-snapshot deleted file mode 100644 index ac3573e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-data deleted file mode 100644 index 63003a8..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-snapshot deleted file mode 100644 index 085ec94..0000000 Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-data deleted file mode 100644 index d6c225f..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-snapshot deleted file mode 100644 index 90c34f5..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-data deleted file mode 100644 index 328992e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-snapshot deleted file mode 100644 index 72ed196..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-data deleted file mode 100644 index 328992e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-snapshot deleted file mode 100644 index b9732f5..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-data deleted file mode 100644 index bf82319..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-snapshot deleted file mode 100644 index 598afc5..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-data deleted file mode 100644 index ca814e2..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-snapshot deleted file mode 100644 index bf8c7d3..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-data deleted file mode 100644 index 328992e..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-snapshot deleted file mode 100644 index e7cb734..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-data deleted file mode 100644 index 2380dd3..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-snapshot deleted file mode 100644 index 463fe1f..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-data deleted file mode 100644 index fc90039..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-snapshot deleted file mode 100644 index fb4085f..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-snapshot and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-data deleted file mode 100644 index 63003a8..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-data and /dev/null differ diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-snapshot deleted file mode 100644 index 4334c76..0000000 Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-snapshot and /dev/null differ diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshotMigrationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshotMigrationTest.scala deleted file mode 100644 index 0352e6d..0000000 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshotMigrationTest.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.typeutils - -import java.util -import java.util.function.Supplier - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshotMigrationTestBase} -import org.apache.flink.api.scala.createTypeInformation -import org.apache.flink.testutils.migration.MigrationVersion -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.{BitSet, LinearSeq, mutable} - -/** - * [[TraversableSerializer]] migration test. - */ -@RunWith(classOf[Parameterized]) -class TraversableSerializerSnapshotMigrationTest( - testSpecification: TypeSerializerSnapshotMigrationTestBase.TestSpecification[ - TraversableOnce[_] - ]) extends TypeSerializerSnapshotMigrationTestBase[TraversableOnce[_]]( - testSpecification - ) - -object TraversableSerializerSnapshotMigrationTest { - - object Types { - - class Pojo(var name: String, var count: Int) { - def this() = this("", -1) - - override def equals(other: Any): Boolean = { - other match { - case oP: Pojo => name == oP.name && count == oP.count - case _ => false - } - } - } - - val seqTypeInfo = implicitly[TypeInformation[Seq[Int]]] - val indexedSeqTypeInfo = - implicitly[TypeInformation[IndexedSeq[Int]]] - val linearSeqTypeInfo = implicitly[TypeInformation[LinearSeq[Int]]] - val mapTypeInfo = implicitly[TypeInformation[Map[String, Int]]] - val setTypeInfo = implicitly[TypeInformation[Set[Int]]] - val bitsetTypeInfo = implicitly[TypeInformation[BitSet]] - val mutableListTypeInfo = - implicitly[TypeInformation[mutable.MutableList[Int]]] - val seqTupleTypeInfo = implicitly[TypeInformation[Seq[(Int, String)]]] - val seqPojoTypeInfo = implicitly[TypeInformation[Seq[Pojo]]] - } - - import Types._ - - @SuppressWarnings(Array("unchecked")) - @Parameterized.Parameters(name = "Test Specification = {0}") - def testSpecifications: util.Collection[ - TypeSerializerSnapshotMigrationTestBase.TestSpecification[_]] = { - - val testSpecifications: TypeSerializerSnapshotMigrationTestBase.TestSpecifications = - new TypeSerializerSnapshotMigrationTestBase.TestSpecifications( - MigrationVersion.v1_6, - MigrationVersion.v1_7) - - val serializerSpecs = Seq( - ("bitset", bitsetTypeInfo), - ("indexedseq", indexedSeqTypeInfo), - ("linearseq", linearSeqTypeInfo), - ("map", mapTypeInfo), - ("mutable-list", mutableListTypeInfo), - ("seq", seqTypeInfo), - ("set", setTypeInfo), - ("with-case-class", seqTupleTypeInfo), - ("with-pojo", seqPojoTypeInfo) - ) - - serializerSpecs foreach { - case (data, typeInfo) => - testSpecifications.add( - s"traversable-serializer-$data", - classOf[TraversableSerializer[_, _]], - classOf[TraversableSerializerSnapshot[_, _]], - new TypeSerializerSupplier(typeInfo) - ) - } - - testSpecifications.get - } - - private class TypeSerializerSupplier[T](typeInfo: TypeInformation[T]) - extends Supplier[TypeSerializer[T]] { - override def get(): TypeSerializer[T] = { - typeInfo - .createSerializer(new ExecutionConfig) - .asInstanceOf[TypeSerializer[T]] - } - } -} diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerUpgradeTest.scala new file mode 100644 index 0000000..0d503a9 --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerUpgradeTest.scala @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import java.util +import java.util.function.Supplier + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase.TestSpecification +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerMatchers, TypeSerializerSchemaCompatibility, TypeSerializerUpgradeTestBase} +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.testutils.migration.MigrationVersion +import org.hamcrest.Matcher +import org.hamcrest.Matchers.is +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.{BitSet, LinearSeq, mutable} + +/** + * A [[TypeSerializerUpgradeTestBase]] for [[TraversableSerializer]]. + */ +@RunWith(classOf[Parameterized]) +class TraversableSerializerUpgradeTest( + testSpecification: TypeSerializerUpgradeTestBase.TestSpecification[ +TraversableOnce[_], TraversableOnce[_]]) + extends TypeSerializerUpgradeTestBase[TraversableOnce[_], TraversableOnce[_]](testSpecification) + +object TraversableSerializerUpgradeTest { + + object Types { + + class Pojo(var name: String, var count: Int) { + def this() = this("", -1) + + override def equals(other: Any): Boolean = { + other match { + case oP: Pojo => name == oP.name && count == oP.count + case _ => false + } + } + } + + val seqTypeInfo = implicitly[TypeInformation[Seq[Int]]] + val indexedSeqTypeInfo = + implicitly[TypeInformation[IndexedSeq[Int]]] + val linearSeqTypeInfo = implicitly[TypeInformation[LinearSeq[Int]]] + val mapTypeInfo = implicitly[TypeInformation[Map[String, Int]]] + val setTypeInfo = implicitly[TypeInformation[Set[Int]]] + val bitsetTypeInfo = implicitly[TypeInformation[BitSet]] + val mutableListTypeInfo = + implicitly[TypeInformation[mutable.MutableList[Int]]] + val seqTupleTypeInfo = implicitly[TypeInformation[Seq[(Int, String)]]] + val seqPojoTypeInfo = implicitly[TypeInformation[Seq[Pojo]]] + } + + import Types._ + + @Parameterized.Parameters(name = "Test Specification = {0}") + def testSpecifications: util.Collection[TestSpecification[_, _]] = { + + val testSpecifications = + new util.ArrayList[TypeSerializerUpgradeTestBase.TestSpecification[_, _]] + for (migrationVersion <- TypeSerializerUpgradeTestBase.MIGRATION_VERSIONS) { + testSpecifications.add( + new TestSpecification[BitSet, BitSet]( + "traversable-serializer-bitset", + migrationVersion, + classOf[BitsetSerializerSetup], + classOf[BitsetSerializerVerifier])) + testSpecifications.add( + new TestSpecification[IndexedSeq[Int], IndexedSeq[Int]]( + "traversable-serializer-indexedseq", + migrationVersion, + classOf[IndexedSeqSerializerSetup], + classOf[IndexedSeqSerializerVerifier])) + testSpecifications.add( + new TestSpecification[LinearSeq[Int], LinearSeq[Int]]( + "traversable-serializer-linearseq", + migrationVersion, + classOf[LinearSeqSerializerSetup], + classOf[LinearSeqSerializerVerifier])) + testSpecifications.add( + new TestSpecification[Map[String, Int], Map[String, Int]]( + "traversable-serializer-map", + migrationVersion, + classOf[MapSerializerSetup], + classOf[MapSerializerVerifier])) + testSpecifications.add( + new TestSpecification[mutable.MutableList[Int], mutable.MutableList[Int]]( + "traversable-serializer-mutable-list", + migrationVersion, + classOf[MutableListSerializerSetup], + classOf[MutableListSerializerVerifier])) + testSpecifications.add( + new TestSpecification[Seq[Int], Seq[Int]]( + "traversable-serializer-seq", + migrationVersion, + classOf[SeqSerializerSetup], + classOf[SeqSerializerVerifier])) + testSpecifications.add( + new TestSpecification[Set[Int], Set[Int]]( + "traversable-serializer-set", + migrationVersion, + classOf[SetSerializerSetup], + classOf[SetSerializerVerifier])) + testSpecifications.add( + new TestSpecification[Seq[(Int, String)], Seq[(Int, String)]]( + "traversable-serializer-with-case-class", + migrationVersion, + classOf[SeqWithCaseClassSetup], + classOf[SeqWithCaseClassVerifier])) + testSpecifications.add( + new TestSpecification[Seq[Pojo], Seq[Pojo]]( + "traversable-serializer-with-pojo", + migrationVersion, + classOf[SeqWithPojoSetup], + classOf[SeqWithPojoVerifier])) + } + testSpecifications + } + + final class BitsetSerializerSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[BitSet] { + override def createPriorSerializer: TypeSerializer[BitSet] = + new TypeSerializerSupplier(bitsetTypeInfo).get() + + override def createTestData: BitSet = BitSet(3, 2, 0) + } + + final class BitsetSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[BitSet] { + override def createUpgradedSerializer: TypeSerializer[BitSet] = + new TypeSerializerSupplier(bitsetTypeInfo).get() + + override def testDataMatcher: Matcher[BitSet] = is(BitSet(3, 2, 0)) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[BitSet]] = + TypeSerializerMatchers.isCompatibleAsIs[BitSet]() + } + + final class IndexedSeqSerializerSetup extends + TypeSerializerUpgradeTestBase.PreUpgradeSetup[IndexedSeq[Int]] { + override def createPriorSerializer: TypeSerializer[IndexedSeq[Int]] = + new TypeSerializerSupplier(indexedSeqTypeInfo).get() + + override def createTestData: IndexedSeq[Int] = IndexedSeq(1, 2, 3) + } + + final class IndexedSeqSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[IndexedSeq[Int]] { + override def createUpgradedSerializer: TypeSerializer[IndexedSeq[Int]] = + new TypeSerializerSupplier(indexedSeqTypeInfo).get() + + override def testDataMatcher: Matcher[IndexedSeq[Int]] = is(IndexedSeq(1, 2, 3)) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[IndexedSeq[Int]]] = + TypeSerializerMatchers.isCompatibleAsIs[IndexedSeq[Int]]() + } + + final class LinearSeqSerializerSetup extends + TypeSerializerUpgradeTestBase.PreUpgradeSetup[LinearSeq[Int]] { + override def createPriorSerializer: TypeSerializer[LinearSeq[Int]] = + new TypeSerializerSupplier(linearSeqTypeInfo).get() + + override def createTestData: LinearSeq[Int] = LinearSeq(2, 3, 4) + } + + final class LinearSeqSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[LinearSeq[Int]] { + override def createUpgradedSerializer: TypeSerializer[LinearSeq[Int]] = + new TypeSerializerSupplier(linearSeqTypeInfo).get() + + override def testDataMatcher: Matcher[LinearSeq[Int]] = is(LinearSeq(2, 3, 4)) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[LinearSeq[Int]]] = + TypeSerializerMatchers.isCompatibleAsIs[LinearSeq[Int]]() + } + + final class MapSerializerSetup extends + TypeSerializerUpgradeTestBase.PreUpgradeSetup[Map[String, Int]] { + override def createPriorSerializer: TypeSerializer[Map[String, Int]] = + new TypeSerializerSupplier(mapTypeInfo).get() + + override def createTestData: Map[String, Int] = Map("Apache" -> 0, "Flink" -> 1) + } + + final class MapSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[Map[String, Int]] { + override def createUpgradedSerializer: TypeSerializer[Map[String, Int]] = + new TypeSerializerSupplier(mapTypeInfo).get() + + override def testDataMatcher: Matcher[Map[String, Int]] = is(Map("Apache" -> 0, "Flink" -> 1)) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[Map[String, Int]]] = + TypeSerializerMatchers.isCompatibleAsIs[Map[String, Int]]() + } + + final class MutableListSerializerSetup extends + TypeSerializerUpgradeTestBase.PreUpgradeSetup[mutable.MutableList[Int]] { + override def createPriorSerializer: TypeSerializer[mutable.MutableList[Int]] = + new TypeSerializerSupplier(mutableListTypeInfo).get() + + override def createTestData: mutable.MutableList[Int] = mutable.MutableList(1, 2, 3) + } + + final class MutableListSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[mutable.MutableList[Int]] { + override def createUpgradedSerializer: TypeSerializer[mutable.MutableList[Int]] = + new TypeSerializerSupplier(mutableListTypeInfo).get() + + override def testDataMatcher: Matcher[mutable.MutableList[Int]] = + is(mutable.MutableList(1, 2, 3)) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[mutable.MutableList[Int]]] = + TypeSerializerMatchers.isCompatibleAsIs[mutable.MutableList[Int]]() + } + + final class SeqSerializerSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Seq[Int]] { + override def createPriorSerializer: TypeSerializer[Seq[Int]] = + new TypeSerializerSupplier(seqTypeInfo).get() + + override def createTestData: Seq[Int] = Seq(1, 2, 3) + } + + final class SeqSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[Seq[Int]] { + override def createUpgradedSerializer: TypeSerializer[Seq[Int]] = + new TypeSerializerSupplier(seqTypeInfo).get() + + override def testDataMatcher: Matcher[Seq[Int]] = is(Seq(1, 2, 3)) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[Seq[Int]]] = + TypeSerializerMatchers.isCompatibleAsIs[Seq[Int]]() + } + + final class SetSerializerSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Set[Int]] { + override def createPriorSerializer: TypeSerializer[Set[Int]] = + new TypeSerializerSupplier(setTypeInfo).get() + + override def createTestData: Set[Int] = Set(2, 3, 4) + } + + final class SetSerializerVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[Set[Int]] { + override def createUpgradedSerializer: TypeSerializer[Set[Int]] = + new TypeSerializerSupplier(setTypeInfo).get() + + override def testDataMatcher: Matcher[Set[Int]] = is(Set(2, 3, 4)) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[Set[Int]]] = + TypeSerializerMatchers.isCompatibleAsIs[Set[Int]]() + } + + final class SeqWithCaseClassSetup extends + TypeSerializerUpgradeTestBase.PreUpgradeSetup[Seq[(Int, String)]] { + override def createPriorSerializer: TypeSerializer[Seq[(Int, String)]] = + new TypeSerializerSupplier(seqTupleTypeInfo).get() + + override def createTestData: Seq[(Int, String)] = Seq((0, "Apache"), (1, "Flink")) + } + + final class SeqWithCaseClassVerifier extends + TypeSerializerUpgradeTestBase.UpgradeVerifier[Seq[(Int, String)]] { + override def createUpgradedSerializer: TypeSerializer[Seq[(Int, String)]] = + new TypeSerializerSupplier(seqTupleTypeInfo).get() + + override def testDataMatcher: Matcher[Seq[(Int, String)]] = is(Seq((0, "Apache"), (1, "Flink"))) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[Seq[(Int, String)]]] = + TypeSerializerMatchers.isCompatibleAsIs[Seq[(Int, String)]]() + } + + final class SeqWithPojoSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Seq[Pojo]] { + override def createPriorSerializer: TypeSerializer[Seq[Pojo]] = + new TypeSerializerSupplier(seqPojoTypeInfo).get() + + override def createTestData: Seq[Pojo] = Seq(new Pojo("Apache", 0), new Pojo("Flink", 1)) + } + + final class SeqWithPojoVerifier extends TypeSerializerUpgradeTestBase.UpgradeVerifier[Seq[Pojo]] { + override def createUpgradedSerializer: TypeSerializer[Seq[Pojo]] = + new TypeSerializerSupplier(seqPojoTypeInfo).get() + + override def testDataMatcher: Matcher[Seq[Pojo]] = + is(Seq(new Pojo("Apache", 0), new Pojo("Flink", 1))) + + override def schemaCompatibilityMatcher(version: MigrationVersion): + Matcher[TypeSerializerSchemaCompatibility[Seq[Pojo]]] = + TypeSerializerMatchers.isCompatibleAsIs[Seq[Pojo]]() + } + + private class TypeSerializerSupplier[T](typeInfo: TypeInformation[T]) + extends Supplier[TypeSerializer[T]] { + override def get(): TypeSerializer[T] = { + typeInfo + .createSerializer(new ExecutionConfig) + .asInstanceOf[TypeSerializer[T]] + } + } + +}
