This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c6be5ac85d22b57550c6a55837149234125bce5
Author: Igal Shilman <igal.shil...@data-artisans.com>
AuthorDate: Tue Feb 26 20:52:12 2019 +0100

    [FLINK-11753] [tests] Add hamcrest matchers for 
TypeSerializerSchemaCompatibility
---
 .../common/typeutils/TypeSerializerMatchers.java   | 199 +++++++++++++++++++++
 .../TypeSerializerSnapshotMigrationTestBase.java   |  33 +---
 .../base/EnumSerializerSnapshotMigrationTest.java  |  39 ++--
 .../avro/typeutils/AvroSerializerSnapshotTest.java |  44 +----
 4 files changed, 215 insertions(+), 100 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java
new file mode 100644
index 0000000..c23f9aa
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerMatchers.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.util.function.Predicate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Collection of useful {@link Matcher}s for {@link TypeSerializer} and 
{@link TypeSerializerSchemaCompatibility}.
+ */
+public final class TypeSerializerMatchers {
+
+       private TypeSerializerMatchers() {
+       }
+
+       // 
-------------------------------------------------------------------------------------------------------------
+       // Matcher Factories
+       // 
-------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Matches {@code compatibleAsIs} {@link 
TypeSerializerSchemaCompatibility}.
+        *
+        * @param <T> element type
+        * @return a {@code Matcher} that matches {@code compatibleAsIs} {@link 
TypeSerializerSchemaCompatibility}.
+        */
+       public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> 
isCompatibleAsIs() {
+               return 
propertyMatcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs,
+                       "type serializer schema that is a compatible as is");
+       }
+
+       /**
+        * Matches {@code isIncompatible} {@link 
TypeSerializerSchemaCompatibility}.
+        *
+        * @param <T> element type
+        * @return a {@code Matcher} that matches {@code isIncompatible} {@link 
TypeSerializerSchemaCompatibility}.
+        */
+       public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> 
isIncompatible() {
+               return 
propertyMatcher(TypeSerializerSchemaCompatibility::isIncompatible,
+                       "type serializer schema that is incompatible");
+       }
+
+       /**
+        * Matches {@code isCompatibleAfterMigration} {@link 
TypeSerializerSchemaCompatibility}.
+        *
+        * @param <T> element type
+        * @return a {@code Matcher} that matches {@code 
isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}.
+        */
+       public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> 
isCompatibleAfterMigration() {
+               return 
propertyMatcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration,
+                       "type serializer schema that is compatible after 
migration");
+       }
+
+       /**
+        * Matches {@code isCompatibleWithReconfiguredSerializer} {@link 
TypeSerializerSchemaCompatibility}.
+        *
+        * @param <T> element type
+        * @return a {@code Matcher} that matches {@code 
isCompatibleWithReconfiguredSerializer} {@link 
TypeSerializerSchemaCompatibility}.
+        */
+       public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> 
isCompatibleWithReconfiguredSerializer() {
+               @SuppressWarnings("unchecked") Matcher<TypeSerializer<T>> 
anything =
+                       (Matcher<TypeSerializer<T>>) (Matcher<?>) 
CoreMatchers.anything();
+
+               return new CompatibleAfterReconfiguration<>(anything);
+       }
+
+       /**
+        * Matches {@code isCompatibleWithReconfiguredSerializer} {@link 
TypeSerializerSchemaCompatibility}.
+        *
+        * @param reconfiguredSerializerMatcher matches the reconfigured 
serializer.
+        * @param <T>                           element type
+        * @return a {@code Matcher} that matches {@code 
isCompatibleWithReconfiguredSerializer} {@link 
TypeSerializerSchemaCompatibility}.
+        */
+       public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> 
isCompatibleWithReconfiguredSerializer(
+               Matcher<? extends TypeSerializer<T>> 
reconfiguredSerializerMatcher) {
+
+               return new 
CompatibleAfterReconfiguration<>(reconfiguredSerializerMatcher);
+       }
+
+       /**
+        * Matches if the expected {@code TypeSerializerSchemaCompatibility} 
has the same compatibility as {@code expectedCompatibility}.
+        *
+        * @param expectedCompatibility the compatibility to match to.
+        * @param <T> element type.
+        * @return a {@code Matcher} that matches if it has the same 
compatibility as {@code expectedCompatibility}.
+        */
+       public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> 
hasSameCompatibilityAs(
+               TypeSerializerSchemaCompatibility<T> expectedCompatibility) {
+
+               return new SchemaCompatibilitySameAs<>(expectedCompatibility);
+       }
+
+       // 
-------------------------------------------------------------------------------------------------------------
+       // Helpers
+       // 
-------------------------------------------------------------------------------------------------------------
+
+       private static <T> Matcher<T> propertyMatcher(Predicate<T> predicate, 
String matcherDescription) {
+               return new TypeSafeMatcher<T>() {
+
+                       @Override
+                       protected boolean matchesSafely(T item) {
+                               return predicate.test(item);
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText(matcherDescription);
+                       }
+               };
+       }
+
+       // 
-------------------------------------------------------------------------------------------------------------
+       // Matchers
+       // 
-------------------------------------------------------------------------------------------------------------
+
+       private static final class CompatibleAfterReconfiguration<T>
+                       extends 
TypeSafeDiagnosingMatcher<TypeSerializerSchemaCompatibility<T>> {
+
+               private final Matcher<? extends TypeSerializer<T>> 
reconfiguredSerializerMatcher;
+
+               private CompatibleAfterReconfiguration(Matcher<? extends 
TypeSerializer<T>> reconfiguredSerializerMatcher) {
+                       this.reconfiguredSerializerMatcher = 
checkNotNull(reconfiguredSerializerMatcher);
+               }
+
+               @Override
+               protected boolean 
matchesSafely(TypeSerializerSchemaCompatibility<T> item, Description 
mismatchDescription) {
+                       if (!item.isCompatibleWithReconfiguredSerializer()) {
+                               mismatchDescription.appendText("serializer 
schema is not compatible with a reconfigured serializer");
+                               return false;
+                       }
+                       TypeSerializer<T> reconfiguredSerializer = 
item.getReconfiguredSerializer();
+                       if 
(!reconfiguredSerializerMatcher.matches(reconfiguredSerializer)) {
+                               
reconfiguredSerializerMatcher.describeMismatch(reconfiguredSerializer, 
mismatchDescription);
+                               return false;
+                       }
+                       return true;
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendText("type serializer schema that is 
compatible after reconfiguration,")
+                               .appendText("with a reconfigured serializer 
matching ")
+                               
.appendDescriptionOf(reconfiguredSerializerMatcher);
+               }
+       }
+
+       private static class SchemaCompatibilitySameAs<T> extends 
TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>> {
+
+               private final TypeSerializerSchemaCompatibility<T> 
expectedCompatibility;
+
+               private 
SchemaCompatibilitySameAs(TypeSerializerSchemaCompatibility<T> 
expectedCompatibility) {
+                       this.expectedCompatibility = 
checkNotNull(expectedCompatibility);
+               }
+
+               @Override
+               protected boolean 
matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) {
+                       if (expectedCompatibility.isCompatibleAsIs()) {
+                               return 
testResultCompatibility.isCompatibleAsIs();
+                       }
+                       else if (expectedCompatibility.isIncompatible()) {
+                               return testResultCompatibility.isIncompatible();
+                       }
+                       else if 
(expectedCompatibility.isCompatibleAfterMigration()) {
+                               return 
testResultCompatibility.isCompatibleAfterMigration();
+                       }
+                       else if 
(expectedCompatibility.isCompatibleWithReconfiguredSerializer()) {
+                               return 
testResultCompatibility.isCompatibleWithReconfiguredSerializer();
+                       }
+                       return false;
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendText("same compatibility as 
").appendValue(expectedCompatibility);
+               }
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 94081d4..c23a983 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -25,9 +25,7 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.Description;
 import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -42,6 +40,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.allOf;
@@ -286,7 +285,7 @@ public abstract class 
TypeSerializerSnapshotMigrationTestBase<ElementT> extends
                                Supplier<? extends TypeSerializer<T>> 
serializerProvider,
                                TypeSerializerSchemaCompatibility<T> 
expectedCompatibilityResult) {
                        this.serializerProvider = serializerProvider;
-                       this.schemaCompatibilityMatcher = 
hasSameCompatibilityType(expectedCompatibilityResult);
+                       this.schemaCompatibilityMatcher = 
hasSameCompatibilityAs(expectedCompatibilityResult);
                        return this;
                }
 
@@ -524,32 +523,4 @@ public abstract class 
TypeSerializerSnapshotMigrationTestBase<ElementT> extends
        protected interface TestResourceFilenameSupplier {
                String get(MigrationVersion testVersion);
        }
-
-       // 
--------------------------------------------------------------------------------------------------------------
-       // Utilities
-       // 
--------------------------------------------------------------------------------------------------------------
-
-       public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> 
hasSameCompatibilityType(TypeSerializerSchemaCompatibility<T> 
expectedCompatibilty) {
-               return new 
TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>>() {
-
-                       @Override
-                       protected boolean 
matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) {
-                               if (expectedCompatibilty.isCompatibleAsIs()) {
-                                       return 
testResultCompatibility.isCompatibleAsIs();
-                               } else if 
(expectedCompatibilty.isIncompatible()) {
-                                       return 
testResultCompatibility.isIncompatible();
-                               } else if 
(expectedCompatibilty.isCompatibleAfterMigration()) {
-                                       return 
testResultCompatibility.isCompatibleAfterMigration();
-                               } else if 
(expectedCompatibilty.isCompatibleWithReconfiguredSerializer()) {
-                                       return 
testResultCompatibility.isCompatibleWithReconfiguredSerializer();
-                               }
-                               return false;
-                       }
-
-                       @Override
-                       public void describeTo(Description description) {
-                               description.appendText("same compatibility as 
").appendValue(expectedCompatibilty);
-                       }
-               };
-       }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java
index ab4204e..d818974 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerSnapshotMigrationTest.java
@@ -18,19 +18,20 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.testutils.migration.MigrationVersion;
 
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
 
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer;
 import static org.apache.flink.api.common.typeutils.base.TestEnum.BAR;
 import static org.apache.flink.api.common.typeutils.base.TestEnum.EMMA;
 import static org.apache.flink.api.common.typeutils.base.TestEnum.FOO;
@@ -62,43 +63,25 @@ public class EnumSerializerSnapshotMigrationTest extends 
TypeSerializerSnapshotM
                                EnumSerializer.class,
                                EnumSerializer.EnumSerializerSnapshot.class,
                                () -> new EnumSerializer(TestEnum.class),
-                               hasExpectedEnumSerializer(previousEnumValues));
+                               
isCompatibleWithReconfiguredSerializer(enumSerializerWith(previousEnumValues))
+               );
 
                return testSpecifications.get();
        }
 
+       private static Matcher<? extends TypeSerializer<TestEnum>> 
enumSerializerWith(final TestEnum[] expectedEnumValues) {
+               return new TypeSafeMatcher<EnumSerializer<TestEnum>>() {
 
-       private static Matcher<TypeSerializerSchemaCompatibility<TestEnum>> 
hasExpectedEnumSerializer(
-                       final TestEnum[] expectedEnumValues) {
-
-               return new 
TypeSafeDiagnosingMatcher<TypeSerializerSchemaCompatibility<TestEnum>>() {
                        @Override
-                       protected boolean matchesSafely(
-                                       
TypeSerializerSchemaCompatibility<TestEnum> item,
-                                       Description mismatchDescription) {
-                               if 
(!item.isCompatibleWithReconfiguredSerializer()) {
-                                       
mismatchDescription.appendText("compatibility mismatch ").appendValue(item);
-                                       return false;
-                               }
-
-                               EnumSerializer<TestEnum> reconfiguredSerialized 
=
-                                               (EnumSerializer<TestEnum>) 
item.getReconfiguredSerializer();
-
-                               if 
(!Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues)) {
-                                       mismatchDescription
-                                                       
.appendText("reconfigured values are ")
-                                                       .appendValueList("{", 
", ", "}", reconfiguredSerialized.getValues());
-                                       return false;
-                               }
-
-                               return true;
+                       protected boolean 
matchesSafely(EnumSerializer<TestEnum> reconfiguredSerialized) {
+                               return 
Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues);
                        }
 
                        @Override
                        public void describeTo(Description description) {
                                description
-                                               .appendText("EnumSerializer 
with values ")
-                                               .appendValueList("{", ", ", 
"}", expectedEnumValues);
+                                       .appendText("EnumSerializer with values 
")
+                                       .appendValueList("{", ", ", "}", 
expectedEnumValues);
                        }
                };
        }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
index 8d3455f..1784cc1 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.formats.avro.typeutils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -30,16 +29,15 @@ import 
org.apache.flink.formats.avro.utils.TestDataGenerator;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericRecord;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
-import java.util.function.Function;
 
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAfterMigration;
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAsIs;
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isIncompatible;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -186,42 +184,6 @@ public class AvroSerializerSnapshotTest {
        }
 
        // 
---------------------------------------------------------------------------------------------------------------
-       // Matchers
-       // 
---------------------------------------------------------------------------------------------------------------
-
-       private Matcher<TypeSerializerSchemaCompatibility> isCompatibleAsIs() {
-               return 
matcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs, "compatible as 
is");
-       }
-
-       private Matcher<TypeSerializerSchemaCompatibility> 
isCompatibleAfterMigration() {
-               return 
matcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration,
-                       "compatible after migration");
-       }
-
-       private Matcher<TypeSerializerSchemaCompatibility> isIncompatible() {
-               return 
matcher(TypeSerializerSchemaCompatibility::isIncompatible,
-                       "incompatible");
-       }
-
-       private static <T> Matcher<T> matcher(Function<T, Boolean> predicate, 
String message) {
-               return new TypeSafeDiagnosingMatcher<T>() {
-
-                       @Override
-                       protected boolean matchesSafely(T item, Description 
mismatchDescription) {
-                               if (predicate.apply(item)) {
-                                       return true;
-                               }
-                               mismatchDescription.appendText("not 
").appendText(message);
-                               return false;
-                       }
-
-                       @Override
-                       public void describeTo(Description description) {
-                       }
-               };
-       }
-
-       // 
---------------------------------------------------------------------------------------------------------------
        // Utils
        // 
---------------------------------------------------------------------------------------------------------------
 

Reply via email to