This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4c6be5ac85d22b57550c6a55837149234125bce5 Author: Igal Shilman <igal.shil...@data-artisans.com> AuthorDate: Tue Feb 26 20:52:12 2019 +0100 [FLINK-11753] [tests] Add hamcrest matchers for TypeSerializerSchemaCompatibility --- .../common/typeutils/TypeSerializerMatchers.java | 199 +++++++++++++++++++++ .../TypeSerializerSnapshotMigrationTestBase.java | 33 +--- .../base/EnumSerializerSnapshotMigrationTest.java | 39 ++-- .../avro/typeutils/AvroSerializerSnapshotTest.java | 44 +---- 4 files changed, 215 insertions(+), 100 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java new file mode 100644 index 0000000..c23f9aa --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.hamcrest.TypeSafeMatcher; + +import java.util.function.Predicate; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link TypeSerializerSchemaCompatibility}. + */ +public final class TypeSerializerMatchers { + + private TypeSerializerMatchers() { + } + + // ------------------------------------------------------------------------------------------------------------- + // Matcher Factories + // ------------------------------------------------------------------------------------------------------------- + + /** + * Matches {@code compatibleAsIs} {@link TypeSerializerSchemaCompatibility}. + * + * @param <T> element type + * @return a {@code Matcher} that matches {@code compatibleAsIs} {@link TypeSerializerSchemaCompatibility}. + */ + public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleAsIs() { + return propertyMatcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs, + "type serializer schema that is a compatible as is"); + } + + /** + * Matches {@code isIncompatible} {@link TypeSerializerSchemaCompatibility}. + * + * @param <T> element type + * @return a {@code Matcher} that matches {@code isIncompatible} {@link TypeSerializerSchemaCompatibility}. + */ + public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isIncompatible() { + return propertyMatcher(TypeSerializerSchemaCompatibility::isIncompatible, + "type serializer schema that is incompatible"); + } + + /** + * Matches {@code isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}. + * + * @param <T> element type + * @return a {@code Matcher} that matches {@code isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}. + */ + public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleAfterMigration() { + return propertyMatcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration, + "type serializer schema that is compatible after migration"); + } + + /** + * Matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}. + * + * @param <T> element type + * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}. + */ + public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleWithReconfiguredSerializer() { + @SuppressWarnings("unchecked") Matcher<TypeSerializer<T>> anything = + (Matcher<TypeSerializer<T>>) (Matcher<?>) CoreMatchers.anything(); + + return new CompatibleAfterReconfiguration<>(anything); + } + + /** + * Matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}. + * + * @param reconfiguredSerializerMatcher matches the reconfigured serializer. + * @param <T> element type + * @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}. + */ + public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleWithReconfiguredSerializer( + Matcher<? extends TypeSerializer<T>> reconfiguredSerializerMatcher) { + + return new CompatibleAfterReconfiguration<>(reconfiguredSerializerMatcher); + } + + /** + * Matches if the expected {@code TypeSerializerSchemaCompatibility} has the same compatibility as {@code expectedCompatibility}. + * + * @param expectedCompatibility the compatibility to match to. + * @param <T> element type. + * @return a {@code Matcher} that matches if it has the same compatibility as {@code expectedCompatibility}. + */ + public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> hasSameCompatibilityAs( + TypeSerializerSchemaCompatibility<T> expectedCompatibility) { + + return new SchemaCompatibilitySameAs<>(expectedCompatibility); + } + + // ------------------------------------------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------------------------------------------- + + private static <T> Matcher<T> propertyMatcher(Predicate<T> predicate, String matcherDescription) { + return new TypeSafeMatcher<T>() { + + @Override + protected boolean matchesSafely(T item) { + return predicate.test(item); + } + + @Override + public void describeTo(Description description) { + description.appendText(matcherDescription); + } + }; + } + + // ------------------------------------------------------------------------------------------------------------- + // Matchers + // ------------------------------------------------------------------------------------------------------------- + + private static final class CompatibleAfterReconfiguration<T> + extends TypeSafeDiagnosingMatcher<TypeSerializerSchemaCompatibility<T>> { + + private final Matcher<? extends TypeSerializer<T>> reconfiguredSerializerMatcher; + + private CompatibleAfterReconfiguration(Matcher<? extends TypeSerializer<T>> reconfiguredSerializerMatcher) { + this.reconfiguredSerializerMatcher = checkNotNull(reconfiguredSerializerMatcher); + } + + @Override + protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> item, Description mismatchDescription) { + if (!item.isCompatibleWithReconfiguredSerializer()) { + mismatchDescription.appendText("serializer schema is not compatible with a reconfigured serializer"); + return false; + } + TypeSerializer<T> reconfiguredSerializer = item.getReconfiguredSerializer(); + if (!reconfiguredSerializerMatcher.matches(reconfiguredSerializer)) { + reconfiguredSerializerMatcher.describeMismatch(reconfiguredSerializer, mismatchDescription); + return false; + } + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("type serializer schema that is compatible after reconfiguration,") + .appendText("with a reconfigured serializer matching ") + .appendDescriptionOf(reconfiguredSerializerMatcher); + } + } + + private static class SchemaCompatibilitySameAs<T> extends TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>> { + + private final TypeSerializerSchemaCompatibility<T> expectedCompatibility; + + private SchemaCompatibilitySameAs(TypeSerializerSchemaCompatibility<T> expectedCompatibility) { + this.expectedCompatibility = checkNotNull(expectedCompatibility); + } + + @Override + protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) { + if (expectedCompatibility.isCompatibleAsIs()) { + return testResultCompatibility.isCompatibleAsIs(); + } + else if (expectedCompatibility.isIncompatible()) { + return testResultCompatibility.isIncompatible(); + } + else if (expectedCompatibility.isCompatibleAfterMigration()) { + return testResultCompatibility.isCompatibleAfterMigration(); + } + else if (expectedCompatibility.isCompatibleWithReconfiguredSerializer()) { + return testResultCompatibility.isCompatibleWithReconfiguredSerializer(); + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("same compatibility as ").appendValue(expectedCompatibility); + } + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java index 94081d4..c23a983 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java @@ -25,9 +25,7 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.testutils.migration.MigrationVersion; import org.apache.flink.util.TestLogger; -import org.hamcrest.Description; import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; import org.junit.Test; import java.io.IOException; @@ -42,6 +40,7 @@ import java.util.LinkedList; import java.util.List; import java.util.function.Supplier; +import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.hamcrest.CoreMatchers.allOf; @@ -286,7 +285,7 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends Supplier<? extends TypeSerializer<T>> serializerProvider, TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult) { this.serializerProvider = serializerProvider; - this.schemaCompatibilityMatcher = hasSameCompatibilityType(expectedCompatibilityResult); + this.schemaCompatibilityMatcher = hasSameCompatibilityAs(expectedCompatibilityResult); return this; } @@ -524,32 +523,4 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends protected interface TestResourceFilenameSupplier { String get(MigrationVersion testVersion); } - - // -------------------------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------------------------- - - public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> hasSameCompatibilityType(TypeSerializerSchemaCompatibility<T> expectedCompatibilty) { - return new TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>>() { - - @Override - protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) { - if (expectedCompatibilty.isCompatibleAsIs()) { - return testResultCompatibility.isCompatibleAsIs(); - } else if (expectedCompatibilty.isIncompatible()) { - return testResultCompatibility.isIncompatible(); - } else if (expectedCompatibilty.isCompatibleAfterMigration()) { - return testResultCompatibility.isCompatibleAfterMigration(); - } else if (expectedCompatibilty.isCompatibleWithReconfiguredSerializer()) { - return testResultCompatibility.isCompatibleWithReconfiguredSerializer(); - } - return false; - } - - @Override - public void describeTo(Description description) { - description.appendText("same compatibility as ").appendValue(expectedCompatibilty); - } - }; - } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java index ab4204e..d818974 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java @@ -18,19 +18,20 @@ package org.apache.flink.api.common.typeutils.base; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; import org.apache.flink.testutils.migration.MigrationVersion; import org.hamcrest.Description; import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; +import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer; import static org.apache.flink.api.common.typeutils.base.TestEnum.BAR; import static org.apache.flink.api.common.typeutils.base.TestEnum.EMMA; import static org.apache.flink.api.common.typeutils.base.TestEnum.FOO; @@ -62,43 +63,25 @@ public class EnumSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM EnumSerializer.class, EnumSerializer.EnumSerializerSnapshot.class, () -> new EnumSerializer(TestEnum.class), - hasExpectedEnumSerializer(previousEnumValues)); + isCompatibleWithReconfiguredSerializer(enumSerializerWith(previousEnumValues)) + ); return testSpecifications.get(); } + private static Matcher<? extends TypeSerializer<TestEnum>> enumSerializerWith(final TestEnum[] expectedEnumValues) { + return new TypeSafeMatcher<EnumSerializer<TestEnum>>() { - private static Matcher<TypeSerializerSchemaCompatibility<TestEnum>> hasExpectedEnumSerializer( - final TestEnum[] expectedEnumValues) { - - return new TypeSafeDiagnosingMatcher<TypeSerializerSchemaCompatibility<TestEnum>>() { @Override - protected boolean matchesSafely( - TypeSerializerSchemaCompatibility<TestEnum> item, - Description mismatchDescription) { - if (!item.isCompatibleWithReconfiguredSerializer()) { - mismatchDescription.appendText("compatibility mismatch ").appendValue(item); - return false; - } - - EnumSerializer<TestEnum> reconfiguredSerialized = - (EnumSerializer<TestEnum>) item.getReconfiguredSerializer(); - - if (!Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues)) { - mismatchDescription - .appendText("reconfigured values are ") - .appendValueList("{", ", ", "}", reconfiguredSerialized.getValues()); - return false; - } - - return true; + protected boolean matchesSafely(EnumSerializer<TestEnum> reconfiguredSerialized) { + return Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues); } @Override public void describeTo(Description description) { description - .appendText("EnumSerializer with values ") - .appendValueList("{", ", ", "}", expectedEnumValues); + .appendText("EnumSerializer with values ") + .appendValueList("{", ", ", "}", expectedEnumValues); } }; } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java index 8d3455f..1784cc1 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java @@ -19,7 +19,6 @@ package org.apache.flink.formats.avro.typeutils; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; @@ -30,16 +29,15 @@ import org.apache.flink.formats.avro.utils.TestDataGenerator; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; -import java.util.function.Function; +import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAfterMigration; +import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAsIs; +import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isIncompatible; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -186,42 +184,6 @@ public class AvroSerializerSnapshotTest { } // --------------------------------------------------------------------------------------------------------------- - // Matchers - // --------------------------------------------------------------------------------------------------------------- - - private Matcher<TypeSerializerSchemaCompatibility> isCompatibleAsIs() { - return matcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs, "compatible as is"); - } - - private Matcher<TypeSerializerSchemaCompatibility> isCompatibleAfterMigration() { - return matcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration, - "compatible after migration"); - } - - private Matcher<TypeSerializerSchemaCompatibility> isIncompatible() { - return matcher(TypeSerializerSchemaCompatibility::isIncompatible, - "incompatible"); - } - - private static <T> Matcher<T> matcher(Function<T, Boolean> predicate, String message) { - return new TypeSafeDiagnosingMatcher<T>() { - - @Override - protected boolean matchesSafely(T item, Description mismatchDescription) { - if (predicate.apply(item)) { - return true; - } - mismatchDescription.appendText("not ").appendText(message); - return false; - } - - @Override - public void describeTo(Description description) { - } - }; - } - - // --------------------------------------------------------------------------------------------------------------- // Utils // ---------------------------------------------------------------------------------------------------------------