This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e80bb0bc96095305d04c7177c5aeb243bd892bba Author: Aljoscha Krettek <[email protected]> AuthorDate: Wed May 27 12:02:11 2020 +0200 [FLINK-13632] Remove TypeSerializerSnapshotMigrationTestBase All tests have been ported to the new test base. --- .../TypeSerializerSnapshotMigrationTestBase.java | 529 --------------------- 1 file changed, 529 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java deleted file mode 100644 index 6aa3481..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.testutils.migration.MigrationVersion; -import org.apache.flink.util.TestLogger; - -import org.hamcrest.Matcher; -import org.junit.Test; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.function.Supplier; - -import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.hamcrest.CoreMatchers.allOf; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * A test base for verifying {@link TypeSerializerSnapshot} migration. - * - * @param <ElementT> the element being serialized. - * - * @deprecated please use the newer {@link TypeSerializerUpgradeTestBase} - */ -@Deprecated -public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends TestLogger { - - private final TestSpecification<ElementT> testSpecification; - - protected TypeSerializerSnapshotMigrationTestBase(TestSpecification<ElementT> testSpecification) { - this.testSpecification = checkNotNull(testSpecification); - } - - @Test - public void serializerSnapshotIsSuccessfullyRead() { - TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest(); - - assertThat(snapshot, allOf( - notNullValue(), - instanceOf(TypeSerializerSnapshot.class) - )); - } - - @Test - public void specifiedNewSerializerHasExpectedCompatibilityResultsWithSnapshot() { - TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest(); - - TypeSerializerSchemaCompatibility<ElementT> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer()); - - assertThat(result, testSpecification.schemaCompatibilityMatcher); - } - - @Test - public void restoredSerializerIsAbleToDeserializePreviousData() throws IOException { - TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest(); - TypeSerializer<ElementT> serializer = snapshot.restoreSerializer(); - - assertSerializerIsAbleToReadOldData(serializer); - } - - @Test - public void reconfiguredSerializerIsAbleToDeserializePreviousData() throws IOException { - TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest(); - TypeSerializerSchemaCompatibility<ElementT> compatibility = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer()); - - if (!compatibility.isCompatibleWithReconfiguredSerializer()) { - // this test only applies for reconfigured serializers. - return; - } - - TypeSerializer<ElementT> serializer = compatibility.getReconfiguredSerializer(); - assertSerializerIsAbleToReadOldData(serializer); - } - - @SuppressWarnings("deprecation") - @Test - public void movingForward() throws IOException { - TypeSerializerSnapshot<ElementT> previousSnapshot = snapshotUnderTest(); - TypeSerializer<ElementT> restoredSerializer = previousSnapshot.restoreSerializer(); - - TypeSerializerSnapshot<ElementT> nextSnapshot = restoredSerializer.snapshotConfiguration(); - assertThat(nextSnapshot, instanceOf(testSpecification.snapshotClass)); - - TypeSerializerSnapshot<ElementT> nextSnapshotDeserialized = writeAndThenReadTheSnapshot(restoredSerializer, nextSnapshot); - - assertThat(nextSnapshotDeserialized, allOf( - notNullValue(), - not(instanceOf(TypeSerializerConfigSnapshot.class)) - )); - } - - @Test - public void restoreSerializerFromNewSerializerSnapshotIsAbleToDeserializePreviousData() throws IOException { - TypeSerializer<ElementT> newSerializer = testSpecification.createSerializer(); - - TypeSerializerSchemaCompatibility<ElementT> compatibility = - snapshotUnderTest().resolveSchemaCompatibility(newSerializer); - - final TypeSerializer<ElementT> nextSerializer; - if (compatibility.isCompatibleWithReconfiguredSerializer()) { - nextSerializer = compatibility.getReconfiguredSerializer(); - } else if (compatibility.isCompatibleAsIs()) { - nextSerializer = newSerializer; - } else { - // this test does not apply. - return; - } - - TypeSerializerSnapshot<ElementT> nextSnapshot = nextSerializer.snapshotConfiguration(); - - assertSerializerIsAbleToReadOldData(nextSnapshot.restoreSerializer()); - } - - // -------------------------------------------------------------------------------------------------------------- - // Test Helpers - // -------------------------------------------------------------------------------------------------------------- - - private TypeSerializerSnapshot<ElementT> writeAndThenReadTheSnapshot( - TypeSerializer<ElementT> serializer, - TypeSerializerSnapshot<ElementT> newSnapshot) throws IOException { - - DataOutputSerializer out = new DataOutputSerializer(128); - TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, newSnapshot, serializer); - - DataInputView in = new DataInputDeserializer(out.wrapAsByteBuffer()); - return readSnapshot(in); - } - - private TypeSerializerSnapshot<ElementT> snapshotUnderTest() { - DataInputView input = contentsOf(testSpecification.getSnapshotDataLocation()); - try { - if (!testSpecification.getTestMigrationVersion().isNewerVersionThan(MigrationVersion.v1_6)) { - return readPre17SnapshotFormat(input); - } else { - return readSnapshot(input); - } - } - catch (IOException e) { - throw new RuntimeException("Unable to read " + testSpecification.getSnapshotDataLocation(), e); - } - } - - @SuppressWarnings({"unchecked", "deprecation"}) - private TypeSerializerSnapshot<ElementT> readPre17SnapshotFormat(DataInputView input) throws IOException { - final ClassLoader cl = Thread.currentThread().getContextClassLoader(); - - List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializers = - TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(input, cl); - - return (TypeSerializerSnapshot<ElementT>) serializers.get(0).f1; - } - - private TypeSerializerSnapshot<ElementT> readSnapshot(DataInputView in) throws IOException { - return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( - in, Thread.currentThread().getContextClassLoader(), null); - } - - private DataInputView dataUnderTest() { - return contentsOf(testSpecification.getTestDataLocation()); - } - - private void assertSerializerIsAbleToReadOldData(TypeSerializer<ElementT> serializer) throws IOException { - DataInputView input = dataUnderTest(); - - final Matcher<ElementT> matcher = testSpecification.testDataElementMatcher; - for (int i = 0; i < testSpecification.testDataCount; i++) { - final ElementT result = serializer.deserialize(input); - assertThat(result, matcher); - } - } - - // -------------------------------------------------------------------------------------------------------------- - // Static Helpers - // -------------------------------------------------------------------------------------------------------------- - - private static DataInputView contentsOf(Path path) { - try { - byte[] bytes = Files.readAllBytes(path); - return new DataInputDeserializer(bytes); - } - catch (IOException e) { - throw new RuntimeException("Failed to read " + path, e); - } - } - - private static Path resourcePath(String resourceName) { - checkNotNull(resourceName, "resource name can not be NULL"); - try { - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - URL resource = cl.getResource(resourceName); - if (resource == null) { - throw new IllegalArgumentException("unable locate test data " + resourceName); - } - return Paths.get(resource.toURI()); - } - catch (URISyntaxException e) { - throw new RuntimeException("unable", e); - } - } - - // -------------------------------------------------------------------------------------------------------------- - // Test Specification - // -------------------------------------------------------------------------------------------------------------- - - /** - * Test Specification. - */ - @SuppressWarnings("WeakerAccess") - public static final class TestSpecification<T> { - private final Class<? extends TypeSerializer<T>> serializerType; - private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass; - private final String name; - private final MigrationVersion testMigrationVersion; - private Supplier<? extends TypeSerializer<T>> serializerProvider; - private Matcher<TypeSerializerSchemaCompatibility<T>> schemaCompatibilityMatcher; - private String snapshotDataLocation; - private String testDataLocation; - private int testDataCount; - - @SuppressWarnings("unchecked") - private Matcher<T> testDataElementMatcher = (Matcher<T>) notNullValue(); - - @SuppressWarnings("unchecked") - public static <T> TestSpecification<T> builder( - String name, - Class<? extends TypeSerializer> serializerClass, - Class<? extends TypeSerializerSnapshot> snapshotClass, - MigrationVersion testMigrationVersion) { - - return new TestSpecification<>( - name, - (Class<? extends TypeSerializer<T>>) serializerClass, - (Class<? extends TypeSerializerSnapshot<T>>) snapshotClass, - testMigrationVersion); - } - - private TestSpecification( - String name, - Class<? extends TypeSerializer<T>> serializerType, - Class<? extends TypeSerializerSnapshot<T>> snapshotClass, - MigrationVersion testMigrationVersion) { - - this.name = name; - this.serializerType = serializerType; - this.snapshotClass = snapshotClass; - this.testMigrationVersion = testMigrationVersion; - } - - public TestSpecification<T> withNewSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) { - return withNewSerializerProvider(serializerProvider, TypeSerializerSchemaCompatibility.compatibleAsIs()); - } - - public TestSpecification<T> withNewSerializerProvider( - Supplier<? extends TypeSerializer<T>> serializerProvider, - TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult) { - this.serializerProvider = serializerProvider; - this.schemaCompatibilityMatcher = hasSameCompatibilityAs(expectedCompatibilityResult); - return this; - } - - public TestSpecification<T> withSchemaCompatibilityMatcher( - Matcher<TypeSerializerSchemaCompatibility<T>> schemaCompatibilityMatcher) { - this.schemaCompatibilityMatcher = schemaCompatibilityMatcher; - return this; - } - - public TestSpecification<T> withSnapshotDataLocation(String snapshotDataLocation) { - this.snapshotDataLocation = snapshotDataLocation; - return this; - } - - public TestSpecification<T> withTestData(String testDataLocation, int testDataCount) { - this.testDataLocation = testDataLocation; - this.testDataCount = testDataCount; - return this; - } - - public TestSpecification<T> withTestDataMatcher(Matcher<T> matcher) { - testDataElementMatcher = matcher; - return this; - } - - public TestSpecification<T> withTestDataCount(int expectedDataItmes) { - this.testDataCount = expectedDataItmes; - return this; - } - - private TypeSerializer<T> createSerializer() { - try { - return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get(); - } - catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeException("serializer provider was not set, and creating the serializer reflectively failed.", e); - } - } - - private Path getTestDataLocation() { - return resourcePath(this.testDataLocation); - } - - private Path getSnapshotDataLocation() { - return resourcePath(this.snapshotDataLocation); - } - - private MigrationVersion getTestMigrationVersion() { - return testMigrationVersion; - } - - @Override - public String toString() { - return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName()); - } - - } - - /** - * Utility class to help build a collection of {@link TestSpecification} for - * multiple test migration versions. For each test specification added, - * an entry will be added for each specified migration version. - */ - public static final class TestSpecifications { - - private static final int DEFAULT_TEST_DATA_COUNT = 10; - private static final String DEFAULT_SNAPSHOT_FILENAME_FORMAT = "flink-%s-%s-snapshot"; - private static final String DEFAULT_TEST_DATA_FILENAME_FORMAT = "flink-%s-%s-data"; - - private final Collection<TestSpecification<?>> testSpecifications = new LinkedList<>(); - private final MigrationVersion[] testVersions; - - public TestSpecifications(MigrationVersion... testVersions) { - checkArgument( - testVersions.length > 0, - "At least one test migration version should be specified."); - this.testVersions = testVersions; - } - - /** - * Adds a test specification to be tested for all specified test versions. - * - * <p>This method adds the specification with pre-defined snapshot and data filenames, - * with the format "flink-<testVersion>-<specName>-<data/snapshot>", - * and each specification's test data count is assumed to always be 10. - * - * @param name test specification name. - * @param serializerClass class of the current serializer. - * @param snapshotClass class of the current serializer snapshot class. - * @param serializerProvider provider for an instance of the current serializer. - * - * @param <T> type of the test data. - */ - public <T> void add( - String name, - Class<? extends TypeSerializer> serializerClass, - Class<? extends TypeSerializerSnapshot> snapshotClass, - Supplier<? extends TypeSerializer<T>> serializerProvider) { - for (MigrationVersion testVersion : testVersions) { - testSpecifications.add( - TestSpecification.<T>builder( - getSpecNameForVersion(name, testVersion), - serializerClass, - snapshotClass, - testVersion) - .withNewSerializerProvider(serializerProvider) - .withSnapshotDataLocation( - String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name)) - .withTestData( - String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name), - DEFAULT_TEST_DATA_COUNT) - ); - } - } - - /** - * Adds a test specification to be tested for all specified test versions. - * - * <p>This method adds the specification with pre-defined snapshot and data filenames, - * with the format "flink-<testVersion>-<specName>-<data/snapshot>", - * and each specification's test data count is assumed to always be 10. - * - * @param <T> type of the test data. - */ - public <T> void addWithCompatibilityMatcher( - String name, - Class<? extends TypeSerializer> serializerClass, - Class<? extends TypeSerializerSnapshot> snapshotClass, - Supplier<? extends TypeSerializer<T>> serializerProvider, - Matcher<TypeSerializerSchemaCompatibility<T>> schemaCompatibilityMatcher) { - for (MigrationVersion testVersion : testVersions) { - testSpecifications.add( - TestSpecification.<T>builder( - getSpecNameForVersion(name, testVersion), - serializerClass, - snapshotClass, - testVersion) - .withNewSerializerProvider(serializerProvider) - .withSchemaCompatibilityMatcher(schemaCompatibilityMatcher) - .withSnapshotDataLocation( - String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name)) - .withTestData( - String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name), - DEFAULT_TEST_DATA_COUNT) - ); - } - } - - /** - * Adds a test specification to be tested for all specified test versions. - * - * <p>This method adds the specification with pre-defined snapshot and data filenames, - * with the format "flink-<testVersion>-<specName>-<data/snapshot>", - * and each specification's test data count is assumed to always be 10. - * - * @param name test specification name. - * @param serializerClass class of the current serializer. - * @param snapshotClass class of the current serializer snapshot class. - * @param serializerProvider provider for an instance of the current serializer. - * @param elementMatcher an {@code hamcrest} matcher to match test data. - * - * @param <T> type of the test data. - */ - public <T> void add( - String name, - Class<? extends TypeSerializer> serializerClass, - Class<? extends TypeSerializerSnapshot> snapshotClass, - Supplier<? extends TypeSerializer<T>> serializerProvider, - Matcher<T> elementMatcher) { - for (MigrationVersion testVersion : testVersions) { - testSpecifications.add( - TestSpecification.<T>builder( - getSpecNameForVersion(name, testVersion), - serializerClass, - snapshotClass, - testVersion) - .withNewSerializerProvider(serializerProvider) - .withSnapshotDataLocation( - String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name)) - .withTestData( - String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name), - DEFAULT_TEST_DATA_COUNT) - .withTestDataMatcher(elementMatcher) - ); - } - } - - /** - * Adds a test specification to be tested for all specified test versions. - * - * @param name test specification name. - * @param serializerClass class of the current serializer. - * @param snapshotClass class of the current serializer snapshot class. - * @param serializerProvider provider for an instance of the current serializer. - * @param testSnapshotFilenameProvider provider for the filename of the test snapshot. - * @param testDataFilenameProvider provider for the filename of the test data. - * @param testDataCount expected number of records to be read in the test data files. - * - * @param <T> type of the test data. - */ - public <T> void add( - String name, - Class<? extends TypeSerializer> serializerClass, - Class<? extends TypeSerializerSnapshot> snapshotClass, - Supplier<? extends TypeSerializer<T>> serializerProvider, - TestResourceFilenameSupplier testSnapshotFilenameProvider, - TestResourceFilenameSupplier testDataFilenameProvider, - int testDataCount) { - for (MigrationVersion testVersion : testVersions) { - testSpecifications.add( - TestSpecification.<T>builder( - getSpecNameForVersion(name, testVersion), - serializerClass, - snapshotClass, - testVersion) - .withNewSerializerProvider(serializerProvider) - .withSnapshotDataLocation(testSnapshotFilenameProvider.get(testVersion)) - .withTestData(testDataFilenameProvider.get(testVersion), testDataCount) - ); - } - } - - public Collection<TestSpecification<?>> get() { - return Collections.unmodifiableCollection(testSpecifications); - } - - private static String getSpecNameForVersion(String baseName, MigrationVersion testVersion) { - return testVersion + "-" + baseName; - } - } - - /** - * Supplier of paths based on {@link MigrationVersion}. - */ - protected interface TestResourceFilenameSupplier { - String get(MigrationVersion testVersion); - } -}
