This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 151cd8cf864e8fce73724c2bd3e7d188ff91c71a
Author: Igal Shilman <igal.shil...@data-artisans.com>
AuthorDate: Wed Feb 27 21:15:13 2019 +0100

    [FLINK-11773] [tests] Add unit tests for KryoSerializerSnapshot
    
    This closes #7852.
---
 .../runtime/kryo/KryoSerializerSnapshotTest.java   | 167 +++++++++++++++++++++
 1 file changed, 167 insertions(+)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotTest.java
new file mode 100644
index 0000000..fbfa765
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoPojosForMigrationTests.Animal;
+import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoPojosForMigrationTests.Dog;
+import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoPojosForMigrationTests.DogKryoSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoPojosForMigrationTests.DogV2KryoSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoPojosForMigrationTests.Parrot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAsIs;
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer;
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isIncompatible;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link KryoSerializerSnapshot}.
+ */
+public class KryoSerializerSnapshotTest {
+
+       private ExecutionConfig oldConfig;
+       private ExecutionConfig newConfig;
+
+       @Before
+       public void setup() {
+               oldConfig = new ExecutionConfig();
+               newConfig = new ExecutionConfig();
+       }
+
+       @Test
+       public void sanityTest() {
+               assertThat(resolveKryoCompatibility(oldConfig, newConfig), 
isCompatibleAsIs());
+       }
+
+       @Test
+       public void addingTypesIsCompatibleAfterReconfiguration() {
+               oldConfig.registerKryoType(Animal.class);
+
+               newConfig.registerKryoType(Animal.class);
+               newConfig.registerTypeWithKryoSerializer(Dog.class, 
DogKryoSerializer.class);
+
+               assertThat(resolveKryoCompatibility(oldConfig, newConfig),
+                       isCompatibleWithReconfiguredSerializer());
+       }
+
+       @Test
+       public void replacingKryoSerializersIsCompatibleAsIs() {
+               oldConfig.registerKryoType(Animal.class);
+               oldConfig.registerTypeWithKryoSerializer(Dog.class, 
DogKryoSerializer.class);
+
+               newConfig.registerKryoType(Animal.class);
+               newConfig.registerTypeWithKryoSerializer(Dog.class, 
DogV2KryoSerializer.class);
+
+               // it is compatible as is, since Kryo does not expose 
compatibility API with KryoSerializers
+               // so we can not know if DogKryoSerializer is compatible with 
DogV2KryoSerializer
+               assertThat(resolveKryoCompatibility(oldConfig, newConfig),
+                       isCompatibleAsIs());
+       }
+
+       @Test
+       public void reorderingIsCompatibleAfterReconfiguration() {
+               oldConfig.registerKryoType(Parrot.class);
+               oldConfig.registerKryoType(Dog.class);
+
+               newConfig.registerKryoType(Dog.class);
+               newConfig.registerKryoType(Parrot.class);
+
+               assertThat(resolveKryoCompatibility(oldConfig, newConfig),
+                       isCompatibleWithReconfiguredSerializer());
+       }
+
+       @Test
+       public void tryingToRestoreWithNonExistingClassShouldBeIncompatible() 
throws IOException {
+               TypeSerializerSnapshot<Animal> restoredSnapshot = 
kryoSnapshotWithMissingClass();
+
+               TypeSerializer<Animal> currentSerializer = new 
KryoSerializer<>(Animal.class, new ExecutionConfig());
+
+               
assertThat(restoredSnapshot.resolveSchemaCompatibility(currentSerializer),
+                       isIncompatible());
+       }
+
+       // 
-------------------------------------------------------------------------------------------------------
+       // Helpers
+       // 
-------------------------------------------------------------------------------------------------------
+
+       private static TypeSerializerSnapshot<Animal> 
kryoSnapshotWithMissingClass() throws IOException {
+               DataInputView in = new 
DataInputDeserializer(unLoadableSnapshotBytes());
+
+               return TypeSerializerSnapshot.readVersionedSnapshot(
+                       in,
+                       KryoSerializerSnapshotTest.class.getClassLoader());
+       }
+
+       /**
+        * This method returns the bytes of a serialized {@link 
KryoSerializerSnapshot}, that contains a Kryo registration
+        * of a class that does not exists in the current classpath.
+        */
+       private static byte[] unLoadableSnapshotBytes() throws IOException {
+               final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
+
+               ClassLoader tempClassLoader =
+                       new URLClassLoader(new URL[0], 
KryoSerializerSnapshotTest.class.getClassLoader());
+               try {
+                       
Thread.currentThread().setContextClassLoader(tempClassLoader);
+
+                       ExecutionConfig conf = 
registerClassThatIsNotInClassPath(tempClassLoader);
+
+                       KryoSerializer<Animal> previousSerializer = new 
KryoSerializer<>(Animal.class, conf);
+                       TypeSerializerSnapshot<Animal> previousSnapshot = 
previousSerializer.snapshotConfiguration();
+
+                       DataOutputSerializer out = new 
DataOutputSerializer(4096);
+                       TypeSerializerSnapshot.writeVersionedSnapshot(out, 
previousSnapshot);
+                       return out.getCopyOfBuffer();
+               }
+               finally {
+                       
Thread.currentThread().setContextClassLoader(originalClassLoader);
+               }
+       }
+
+       private static ExecutionConfig 
registerClassThatIsNotInClassPath(ClassLoader tempClassLoader) {
+               Object objectForClassNotInClassPath =
+                       
CommonTestUtils.createObjectForClassNotInClassPath(tempClassLoader);
+
+               ExecutionConfig conf = new ExecutionConfig();
+               conf.registerKryoType(objectForClassNotInClassPath.getClass());
+               return conf;
+       }
+
+       private static TypeSerializerSchemaCompatibility<Animal> 
resolveKryoCompatibility(ExecutionConfig previous, ExecutionConfig current) {
+               KryoSerializer<Animal> previousSerializer = new 
KryoSerializer<>(Animal.class, previous);
+               TypeSerializerSnapshot<Animal> previousSnapshot = 
previousSerializer.snapshotConfiguration();
+
+               TypeSerializer<Animal> currentSerializer = new 
KryoSerializer<>(Animal.class, current);
+               return 
previousSnapshot.resolveSchemaCompatibility(currentSerializer);
+       }
+}

Reply via email to