This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit edeeda31745e63ec80caff97286343de8d2d2c43 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Mon Feb 25 16:45:54 2019 +0800 [FLINK-11741] [tests] Remove ensureCompatibility implementation in all test-related serializers This also fixes the snapshotConfiguration method of some test-related serializers, to return a proper snapshot of itself. --- .../TypeSerializerSerializationUtilTest.java | 31 +++++++++++------ .../typeutils/TypeSerializerSnapshotTest.java | 5 --- .../api/java/io/CollectionInputFormatTest.java | 6 ---- .../testutils/types/IntListSerializer.java | 6 ---- .../testutils/types/IntPairSerializer.java | 6 ---- .../testutils/types/StringPairSerializer.java | 10 ++---- .../flink/runtime/query/KvStateRegistryTest.java | 6 ---- .../state/InternalPriorityQueueTestBase.java | 39 +++++++++++++--------- .../runtime/state/OperatorStateBackendTest.java | 13 ++++---- .../state/heap/TestDuplicateSerializer.java | 10 ++---- .../testutils/recordutils/RecordSerializer.java | 10 ++---- .../TypeSerializerSnapshotMigrationITCase.java | 10 ------ 12 files changed, 57 insertions(+), 95 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java index 1a81c61..8e01b13 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java @@ -236,7 +236,12 @@ public class TypeSerializerSerializationUtilTest implements Serializable { @Test public void testAnonymousSerializerClassWithChangedSerialVersionUID() throws Exception { - TypeSerializer anonymousClassSerializer = new AbstractIntSerializer() {}; + TypeSerializer anonymousClassSerializer = new AbstractIntSerializer() { + @Override + public TypeSerializerSnapshot<Integer> snapshotConfiguration() { + return null; + } + }; // assert that our assumption holds Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass()); @@ -405,16 +410,6 @@ public class TypeSerializerSerializationUtilTest implements Serializable { } @Override - public TypeSerializerSnapshot<Integer> snapshotConfiguration() { - return IntSerializer.INSTANCE.snapshotConfiguration(); - } - - @Override - public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot); - } - - @Override public int getLength() { return IntSerializer.INSTANCE.getLength(); } @@ -433,5 +428,19 @@ public class TypeSerializerSerializationUtilTest implements Serializable { /** Just some serializer used for tests. */ public static class TestIntSerializer extends AbstractIntSerializer { private static final long serialVersionUID = -3684467698271707216L; + + @Override + public TypeSerializerSnapshot<Integer> snapshotConfiguration() { + return new TestIntSerializerSnapshot(); + } + } + + /** + * Test serializer snapshot. + */ + public static class TestIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> { + public TestIntSerializerSnapshot() { + super(TestIntSerializer::new); + } } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java index ece8bc8..0e8305c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotTest.java @@ -180,11 +180,6 @@ public class TypeSerializerSnapshotTest { public TypeSerializerSnapshot<Object> snapshotConfiguration() { return new TestSerializerConfigSnapshot(); } - - @Override - public CompatibilityResult<Object> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - return compatible ? CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); - } } public static class TestSerializerConfigSnapshot extends TypeSerializerConfigSnapshot<Object> { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index 95b4e3b..87df0f3 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -21,7 +21,6 @@ package org.apache.flink.api.java.io; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -393,10 +392,5 @@ public class CollectionInputFormatTest { public TypeSerializerConfigSnapshot<ElementType> snapshotConfiguration() { throw new UnsupportedOperationException(); } - - @Override - public CompatibilityResult<ElementType> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - throw new UnsupportedOperationException(); - } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java index e810337..7d06230 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.testutils.types; import java.io.IOException; import java.util.Arrays; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -123,9 +122,4 @@ public class IntListSerializer extends TypeSerializer<IntList> { public TypeSerializerConfigSnapshot<IntList> snapshotConfiguration() { throw new UnsupportedOperationException(); } - - @Override - public CompatibilityResult<IntList> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - throw new UnsupportedOperationException(); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java index 2e40a0e..1392f9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.testutils.types; import java.io.IOException; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -132,9 +131,4 @@ public class IntPairSerializer extends TypeSerializer<IntPair> { public TypeSerializerConfigSnapshot<IntPair> snapshotConfiguration() { throw new UnsupportedOperationException(); } - - @Override - public CompatibilityResult<IntPair> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - throw new UnsupportedOperationException(); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java index af64a59..2d28b54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java @@ -20,9 +20,8 @@ package org.apache.flink.runtime.operators.testutils.types; import java.io.IOException; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.StringValue; @@ -97,12 +96,7 @@ public class StringPairSerializer extends TypeSerializer<StringPair> { } @Override - public TypeSerializerConfigSnapshot<StringPair> snapshotConfiguration() { - throw new UnsupportedOperationException(); - } - - @Override - public CompatibilityResult<StringPair> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { + public TypeSerializerSnapshot<StringPair> snapshotConfiguration() { throw new UnsupportedOperationException(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java index bd94639..dc5e5d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -404,10 +403,5 @@ public class KvStateRegistryTest extends TestLogger { public TypeSerializerConfigSnapshot<String> snapshotConfiguration() { return null; } - - @Override - public CompatibilityResult<String> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - return null; - } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java index 2f4a33e..5f613f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.state; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -496,17 +496,11 @@ public abstract class InternalPriorityQueueTestBase extends TestLogger { } @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { + public Snapshot snapshotConfiguration() { return new Snapshot(getRevision()); } - @Override - public CompatibilityResult<TestElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - return (configSnapshot instanceof Snapshot) && ((Snapshot) configSnapshot).revision <= getRevision() ? - CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); - } - - public static class Snapshot extends TypeSerializerConfigSnapshot { + public static class Snapshot implements TypeSerializerSnapshot<TestElement> { private int revision; @@ -528,7 +522,7 @@ public abstract class InternalPriorityQueueTestBase extends TestLogger { } @Override - public int getVersion() { + public int getCurrentVersion() { return 0; } @@ -537,16 +531,31 @@ public abstract class InternalPriorityQueueTestBase extends TestLogger { } @Override - public void write(DataOutputView out) throws IOException { - super.write(out); + public void writeSnapshot(DataOutputView out) throws IOException { out.writeInt(revision); } @Override - public void read(DataInputView in) throws IOException { - super.read(in); + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { this.revision = in.readInt(); } + + @Override + public TypeSerializer<TestElement> restoreSerializer() { + return new TestElementSerializer(); + } + + @Override + public TypeSerializerSchemaCompatibility<TestElement> resolveSchemaCompatibility(TypeSerializer<TestElement> newSerializer) { + if (!(newSerializer instanceof TestElementSerializer)) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + TestElementSerializer testElementSerializer = (TestElementSerializer) newSerializer; + return (revision <= testElementSerializer.getRevision()) + ? TypeSerializerSchemaCompatibility.compatibleAsIs() + : TypeSerializerSchemaCompatibility.incompatible(); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 78e6f4e..f842fb2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -23,9 +23,8 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; @@ -340,12 +339,14 @@ public class OperatorStateBackendTest { @Override public TypeSerializerSnapshot<Integer> snapshotConfiguration() { - return IntSerializer.INSTANCE.snapshotConfiguration(); + return new VerifyingIntSerializerSnapshot(); } + } - @Override - public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot); + @SuppressWarnings("WeakerAccess") + public static class VerifyingIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> { + public VerifyingIntSerializerSnapshot() { + super(() -> new VerifyingIntSerializer(Thread.currentThread().getContextClassLoader(), new AtomicInteger())); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java index a7e4ac8..da7fef8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestDuplicateSerializer.java @@ -18,9 +18,8 @@ package org.apache.flink.runtime.state.heap; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -113,12 +112,7 @@ public class TestDuplicateSerializer extends TypeSerializer<Integer> { } @Override - public TypeSerializerConfigSnapshot<Integer> snapshotConfiguration() { - throw new UnsupportedOperationException(); - } - - @Override - public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { + public TypeSerializerSnapshot<Integer> snapshotConfiguration() { throw new UnsupportedOperationException(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java index a892bf4..ce060f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java @@ -21,9 +21,8 @@ package org.apache.flink.runtime.testutils.recordutils; import java.io.IOException; -import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Record; @@ -135,12 +134,7 @@ public final class RecordSerializer extends TypeSerializer<Record> { } @Override - public TypeSerializerConfigSnapshot<Record> snapshotConfiguration() { - throw new UnsupportedOperationException(); - } - - @Override - public CompatibilityResult<Record> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { + public TypeSerializerSnapshot<Record> snapshotConfiguration() { throw new UnsupportedOperationException(); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java index 3aa3963..a4d26de 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java @@ -179,16 +179,6 @@ public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTes return new TestSerializerSnapshot(configPayload); } - /* - @Override - public CompatibilityResult<Long> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - return (configSnapshot instanceof TestSerializerSnapshot - && ((TestSerializerSnapshot) configSnapshot).configPayload.equals(configPayload)) - ? CompatibilityResult.compatible() - : CompatibilityResult.requiresMigration(); - } - */ - @Override public TypeSerializer<Long> duplicate() { return this;