This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bf399ba08e02e7ab5b4bf20229a00f5eb27d6964
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Wed Jan 22 09:09:27 2020 +0100

    [FLINK-13632] Port BufferEntrySerializer upgrade test to 
TypeSerializerUpgradeTestBase
---
 .../api/operators/co/IntervalJoinOperator.java     |  18 +++-
 .../api/operators/co/BufferEntryMatchers.java      |  69 +++++++++++++
 .../co/BufferEntrySerializerMigrationTest.java     |  57 -----------
 .../co/BufferEntrySerializerUpgradeTest.java       | 112 +++++++++++++++++++++
 .../flink-1.6-buffer-entry-serializer-data         | Bin 160 -> 0 bytes
 .../flink-1.6-buffer-entry-serializer-snapshot     | Bin 935 -> 0 bytes
 .../flink-1.7-buffer-entry-serializer-data         | Bin 160 -> 0 bytes
 .../flink-1.7-buffer-entry-serializer-snapshot     | Bin 936 -> 0 bytes
 8 files changed, 195 insertions(+), 61 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 3c99022..5555f48 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -361,15 +361,25 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
         */
        @Internal
        @VisibleForTesting
-       static class BufferEntry<T> {
+       public static class BufferEntry<T> {
 
                private final T element;
                private final boolean hasBeenJoined;
 
-               BufferEntry(T element, boolean hasBeenJoined) {
+               public BufferEntry(T element, boolean hasBeenJoined) {
                        this.element = element;
                        this.hasBeenJoined = hasBeenJoined;
                }
+
+               @VisibleForTesting
+               public T getElement() {
+                       return element;
+               }
+
+               @VisibleForTesting
+               public boolean hasBeenJoined() {
+                       return hasBeenJoined;
+               }
        }
 
        /**
@@ -377,13 +387,13 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
         */
        @Internal
        @VisibleForTesting
-       static class BufferEntrySerializer<T> extends 
TypeSerializer<BufferEntry<T>> {
+       public static class BufferEntrySerializer<T> extends 
TypeSerializer<BufferEntry<T>> {
 
                private static final long serialVersionUID = 
-20197698803836236L;
 
                private final TypeSerializer<T> elementSerializer;
 
-               BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+               public BufferEntrySerializer(TypeSerializer<T> 
elementSerializer) {
                        this.elementSerializer = 
Preconditions.checkNotNull(elementSerializer);
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntryMatchers.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntryMatchers.java
new file mode 100644
index 0000000..4720add
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntryMatchers.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+
+/**
+ * {@link Matcher Matchers} for {@link IntervalJoinOperator.BufferEntry}.
+ */
+public class BufferEntryMatchers {
+
+       /**
+        * Creates a matcher that matches when the given element and {@code 
hasBeenJoined} value match
+        * the given matchers.
+        */
+       public static <T> Matcher<IntervalJoinOperator.BufferEntry<T>> 
bufferEntry(
+                       Matcher<T> elementMatcher,
+                       Matcher<Boolean> hasBeenJoinedMatcher) {
+               return new IsBufferEntry<>(elementMatcher, 
hasBeenJoinedMatcher);
+       }
+
+       static class IsBufferEntry<T> extends 
TypeSafeDiagnosingMatcher<IntervalJoinOperator.BufferEntry<T>> {
+               private final Matcher<T> elementMatcher;
+               private final Matcher<Boolean> hasBeenJoinedMatcher;
+
+               public IsBufferEntry(Matcher<T> elementMatcher, 
Matcher<Boolean> hasBeenJoinedMatcher) {
+                       this.elementMatcher = elementMatcher;
+                       this.hasBeenJoinedMatcher = hasBeenJoinedMatcher;
+               }
+
+               @Override
+               protected boolean matchesSafely(
+                               IntervalJoinOperator.BufferEntry<T> item, 
Description mismatchDescription) {
+                       mismatchDescription.appendText("BufferEntry with 
element ");
+                       mismatchDescription.appendValue(item.getElement());
+                       mismatchDescription.appendText(" with hasBeenJoined ");
+                       mismatchDescription.appendValue(item.hasBeenJoined());
+
+                       return elementMatcher.matches(item.getElement()) &&
+                                       
hasBeenJoinedMatcher.matches(item.hasBeenJoined());
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendText("BufferEntry with element ");
+                       elementMatcher.describeTo(description);
+                       description.appendText(" with hasBeenJoined ");
+                       hasBeenJoinedMatcher.describeTo(description);
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
deleted file mode 100644
index d4d2673..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import 
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntry;
-import 
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializer;
-import 
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializerSnapshot;
-import org.apache.flink.testutils.migration.MigrationVersion;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Collection;
-
-/**
- * State migration tests for {@link BufferEntrySerializer}.
- */
-@RunWith(Parameterized.class)
-public class BufferEntrySerializerMigrationTest extends 
TypeSerializerSnapshotMigrationTestBase<BufferEntry<String>> {
-
-       public 
BufferEntrySerializerMigrationTest(TestSpecification<BufferEntry<String>> 
testSpecification) {
-               super(testSpecification);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Parameterized.Parameters(name = "Test Specification = {0}")
-       public static Collection<TestSpecification<?>> testSpecifications() {
-
-               final TestSpecifications testSpecifications = new 
TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
-
-               testSpecifications.add(
-                       "buffer-entry-serializer",
-                       BufferEntrySerializer.class,
-                       BufferEntrySerializerSnapshot.class,
-                       () -> new 
BufferEntrySerializer<>(StringSerializer.INSTANCE));
-
-               return testSpecifications.get();
-       }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerUpgradeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerUpgradeTest.java
new file mode 100644
index 0000000..53ad66d
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerUpgradeTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import 
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntry;
+import 
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.operators.co.BufferEntryMatchers.bufferEntry;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * State migration tests for {@link BufferEntrySerializer}.
+ */
+@RunWith(Parameterized.class)
+public class BufferEntrySerializerUpgradeTest
+               extends TypeSerializerUpgradeTestBase<BufferEntry<String>, 
BufferEntry<String>> {
+
+       public BufferEntrySerializerUpgradeTest(
+                       TestSpecification<BufferEntry<String>, 
BufferEntry<String>> testSpecification) {
+               super(testSpecification);
+       }
+
+       @Parameterized.Parameters(name = "Test Specification = {0}")
+       public static Collection<TestSpecification<?, ?>> testSpecifications() 
throws Exception {
+
+               ArrayList<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();
+               for (MigrationVersion migrationVersion : MIGRATION_VERSIONS) {
+                       testSpecifications.add(
+                                       new TestSpecification<>(
+                                                       
"buffer-entry-serializer",
+                                                       migrationVersion,
+                                                       
BufferEntrySerializerSetup.class,
+                                                       
BufferEntrySerializerVerifier.class));
+               }
+
+               return testSpecifications;
+       }
+
+       // 
----------------------------------------------------------------------------------------------
+       //  Specification for "buffer-entry-serializer"
+       // 
----------------------------------------------------------------------------------------------
+
+       /**
+        * This class is only public to work with {@link 
org.apache.flink.api.common.typeutils.ClassRelocator}.
+        */
+       public static final class BufferEntrySerializerSetup
+                       implements 
TypeSerializerUpgradeTestBase.PreUpgradeSetup<BufferEntry<String>> {
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               @Override
+               public TypeSerializer<BufferEntry<String>> 
createPriorSerializer() {
+                       return new 
BufferEntrySerializer(StringSerializer.INSTANCE);
+               }
+
+               @Override
+               public BufferEntry<String> createTestData() {
+                       return new BufferEntry<>("hello", false);
+               }
+       }
+
+       /**
+        * This class is only public to work with {@link 
org.apache.flink.api.common.typeutils.ClassRelocator}.
+        */
+       public static final class BufferEntrySerializerVerifier
+                       implements 
TypeSerializerUpgradeTestBase.UpgradeVerifier<BufferEntry<String>> {
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               @Override
+               public TypeSerializer<BufferEntry<String>> 
createUpgradedSerializer() {
+                       return new 
BufferEntrySerializer(StringSerializer.INSTANCE);
+               }
+
+               @Override
+               public Matcher<BufferEntry<String>> testDataMatcher() {
+                       return bufferEntry(is("hello"), is(false));
+               }
+
+               @Override
+               public 
Matcher<TypeSerializerSchemaCompatibility<BufferEntry<String>>> 
schemaCompatibilityMatcher(MigrationVersion version) {
+                       return TypeSerializerMatchers.isCompatibleAsIs();
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data
 
b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data
deleted file mode 100644
index a4af1fc..0000000
Binary files 
a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data
 and /dev/null differ
diff --git 
a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot
 
b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot
deleted file mode 100644
index 6141180..0000000
Binary files 
a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot
 and /dev/null differ
diff --git 
a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data
 
b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data
deleted file mode 100644
index 36c9dc7..0000000
Binary files 
a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data
 and /dev/null differ
diff --git 
a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot
 
b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot
deleted file mode 100644
index af92e1b..0000000
Binary files 
a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot
 and /dev/null differ

Reply via email to