This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e791b1a29b675af4f290be9b68ae7f03b2de43c5 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Thu Dec 6 19:07:16 2018 +0800 [FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot --- .../typeutils/base/GenericArraySerializer.java | 4 +- .../base/GenericArraySerializerConfigSnapshot.java | 20 +++--- .../base/GenericArraySerializerSnapshot.java | 81 ++++++++++++++++++++++ ...mpositeTypeSerializerSnapshotMigrationTest.java | 4 +- 4 files changed, 94 insertions(+), 15 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java index 55ba8ab..a4949fb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java @@ -206,7 +206,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> { // -------------------------------------------------------------------------------------------- @Override - public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() { - return new GenericArraySerializerConfigSnapshot<>(this); + public GenericArraySerializerSnapshot<C> snapshotConfiguration() { + return new GenericArraySerializerSnapshot<>(this); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java index b0aa241..8cbe76c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java @@ -38,8 +38,12 @@ import static org.apache.flink.util.Preconditions.checkState; * Point-in-time configuration of a {@link GenericArraySerializer}. * * @param <C> The component type. + * + * @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}. + * It has been replaced by {@link GenericArraySerializerSnapshot}. */ @Internal +@Deprecated public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> { private static final int CURRENT_VERSION = 2; @@ -118,18 +122,12 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial @Override public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) { - checkState(componentClass != null && nestedSnapshot != null); - if (newSerializer instanceof GenericArraySerializer) { - GenericArraySerializer<C> serializer = (GenericArraySerializer<C>) newSerializer; - TypeSerializerSchemaCompatibility<C> compat = serializer.getComponentClass() == componentClass ? - TypeSerializerSchemaCompatibility.compatibleAsIs() : - TypeSerializerSchemaCompatibility.incompatible(); - - return nestedSnapshot.resolveCompatibilityWithNested( - compat, serializer.getComponentSerializer()); - } - else { + // delegate to the new snapshot class + GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer; + GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer); + return newSnapshot.resolveSchemaCompatibility(castedNewSerializer); + } else { return TypeSerializerSchemaCompatibility.incompatible(); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java new file mode 100644 index 0000000..3f54dee --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** + * Point-in-time configuration of a {@link GenericArraySerializer}. + * + * @param <C> The component type. + */ +public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> { + + private static final int CURRENT_VERSION = 1; + + private Class<C> componentClass; + + /** + * Constructor to be used for read instantiation. + */ + public GenericArraySerializerSnapshot() { + super(GenericArraySerializer.class); + } + + /** + * Constructor to be used for writing the snapshot. + */ + public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) { + super(genericArraySerializer); + this.componentClass = genericArraySerializer.getComponentClass(); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return CURRENT_VERSION; + } + + @Override + protected void writeOuterSnapshot(DataOutputView out) throws IOException { + out.writeUTF(componentClass.getName()); + } + + @Override + protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { + this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); + } + + @Override + protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0]; + return new GenericArraySerializer<>(componentClass, componentSerializer); + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) { + return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java index c7b002a..c6b49a4 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; -import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; @@ -55,7 +55,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer // GenericArray<String> - final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class) + final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class) .withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE)) .withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot") .withTestData("flink-1.6-array-type-serializer-data", 10);