This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e791b1a29b675af4f290be9b68ae7f03b2de43c5
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Thu Dec 6 19:07:16 2018 +0800

    [FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new 
GenericArraySerializerSnapshot
---
 .../typeutils/base/GenericArraySerializer.java     |  4 +-
 .../base/GenericArraySerializerConfigSnapshot.java | 20 +++---
 .../base/GenericArraySerializerSnapshot.java       | 81 ++++++++++++++++++++++
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  4 +-
 4 files changed, 94 insertions(+), 15 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 55ba8ab..a4949fb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -206,7 +206,7 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
-               return new GenericArraySerializerConfigSnapshot<>(this);
+       public GenericArraySerializerSnapshot<C> snapshotConfiguration() {
+               return new GenericArraySerializerSnapshot<>(this);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index b0aa241..8cbe76c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -38,8 +38,12 @@ import static org.apache.flink.util.Preconditions.checkState;
  * Point-in-time configuration of a {@link GenericArraySerializer}.
  *
  * @param <C> The component type.
+ *
+ * @deprecated this is deprecated and no longer used by the {@link 
GenericArraySerializer}.
+ *             It has been replaced by {@link GenericArraySerializerSnapshot}.
  */
 @Internal
+@Deprecated
 public final class GenericArraySerializerConfigSnapshot<C> implements 
TypeSerializerSnapshot<C[]> {
 
        private static final int CURRENT_VERSION = 2;
@@ -118,18 +122,12 @@ public final class 
GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 
        @Override
        public TypeSerializerSchemaCompatibility<C[]> 
resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
-               checkState(componentClass != null && nestedSnapshot != null);
-
                if (newSerializer instanceof GenericArraySerializer) {
-                       GenericArraySerializer<C> serializer = 
(GenericArraySerializer<C>) newSerializer;
-                       TypeSerializerSchemaCompatibility<C> compat = 
serializer.getComponentClass() == componentClass ?
-                                       
TypeSerializerSchemaCompatibility.compatibleAsIs() :
-                                       
TypeSerializerSchemaCompatibility.incompatible();
-
-                       return nestedSnapshot.resolveCompatibilityWithNested(
-                                       compat, 
serializer.getComponentSerializer());
-               }
-               else {
+                       // delegate to the new snapshot class
+                       GenericArraySerializer<C> castedNewSerializer = 
(GenericArraySerializer<C>) newSerializer;
+                       GenericArraySerializerSnapshot<C> newSnapshot = new 
GenericArraySerializerSnapshot<>(castedNewSerializer);
+                       return 
newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
+               } else {
                        return TypeSerializerSchemaCompatibility.incompatible();
                }
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
new file mode 100644
index 0000000..3f54dee
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Point-in-time configuration of a {@link GenericArraySerializer}.
+ *
+ * @param <C> The component type.
+ */
+public final class GenericArraySerializerSnapshot<C> extends 
CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
+
+       private static final int CURRENT_VERSION = 1;
+
+       private Class<C> componentClass;
+
+       /**
+        * Constructor to be used for read instantiation.
+        */
+       public GenericArraySerializerSnapshot() {
+               super(GenericArraySerializer.class);
+       }
+
+       /**
+        * Constructor to be used for writing the snapshot.
+        */
+       public GenericArraySerializerSnapshot(GenericArraySerializer<C> 
genericArraySerializer) {
+               super(genericArraySerializer);
+               this.componentClass = 
genericArraySerializer.getComponentClass();
+       }
+
+       @Override
+       protected int getCurrentOuterSnapshotVersion() {
+               return CURRENT_VERSION;
+       }
+
+       @Override
+       protected void writeOuterSnapshot(DataOutputView out) throws 
IOException {
+               out.writeUTF(componentClass.getName());
+       }
+
+       @Override
+       protected void readOuterSnapshot(int readOuterSnapshotVersion, 
DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+               this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
+       }
+
+       @Override
+       protected GenericArraySerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<C> componentSerializer = (TypeSerializer<C>) 
nestedSerializers[0];
+               return new GenericArraySerializer<>(componentClass, 
componentSerializer);
+       }
+
+       @Override
+       protected TypeSerializer<?>[] 
getNestedSerializers(GenericArraySerializer outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getComponentSerializer() };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c7b002a..c6b49a4 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import 
org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
+import 
org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
@@ -55,7 +55,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest 
extends TypeSerializer
 
                // GenericArray<String>
 
-               final TestSpecification<String[]> array = 
TestSpecification.<String[]>builder("1.6-generic-array", 
GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class)
+               final TestSpecification<String[]> array = 
TestSpecification.<String[]>builder("1.6-generic-array", 
GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
                        .withSerializerProvider(() -> new 
GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
                        
.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
                        .withTestData("flink-1.6-array-type-serializer-data", 
10);

Reply via email to