This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d7ef63d [FLINK-11329][core] Migrate CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface d7ef63d is described below commit d7ef63df15aa78cf84a004da62fb2ba1716688ba Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Jan 22 18:14:16 2019 +0100 [FLINK-11329][core] Migrate CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface --- .../runtime/types/CRowSerializerSnapshot.java | 64 ++++++++++++++++++++++ .../flink/table/runtime/types/CRowSerializer.scala | 52 ++++++++---------- 2 files changed, 86 insertions(+), 30 deletions(-) diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java new file mode 100644 index 0000000..4deea9b --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types; + +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.Row; + +/** + * Snapshot class for {@link CRowSerializer}. + */ +public class CRowSerializerSnapshot extends CompositeTypeSerializerSnapshot<CRow, CRowSerializer> { + + private static final int CURRENT_VERSION = 1; + + /** + * Constructor for read instantiation. + */ + public CRowSerializerSnapshot() { + super(CRowSerializer.class); + } + + /** + * Constructor to create the snapshot for writing. + */ + public CRowSerializerSnapshot(CRowSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return CURRENT_VERSION; + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers(CRowSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.rowSerializer()}; + } + + @Override + protected CRowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + + @SuppressWarnings("unchecked") + TypeSerializer<Row> rowSerializer = (TypeSerializer<Row>) nestedSerializers[0]; + + return new CRowSerializer(rowSerializer); + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala index b3fe508..02e9160 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils._ import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.types.Row +@SerialVersionUID(2L) class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSerializer[CRow] { override def isImmutableType: Boolean = false @@ -80,41 +81,22 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali // Serializer configuration snapshotting & compatibility // -------------------------------------------------------------------------------------------- - override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = { - new CRowSerializer.CRowSerializerConfigSnapshot(Array(rowSerializer)) - } - - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[CRow] = { - - configSnapshot match { - case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot => - val compatResult = CompatibilityUtil.resolveCompatibilityResult( - crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0, - classOf[UnloadableDummyTypeSerializer[_]], - crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1, - rowSerializer) - - if (compatResult.isRequiresMigration) { - if (compatResult.getConvertDeserializer != null) { - CompatibilityResult.requiresMigration( - new CRowSerializer( - new TypeDeserializerAdapter(compatResult.getConvertDeserializer)) - ) - } else { - CompatibilityResult.requiresMigration() - } - } else { - CompatibilityResult.compatible() - } - - case _ => CompatibilityResult.requiresMigration() - } + override def snapshotConfiguration(): TypeSerializerSnapshot[CRow] = { + new CRowSerializerSnapshot(this) } } object CRowSerializer { + /** + * [[CRowSerializer]] is not meant to be used for persisting state. In versions 1.6+ there + * were changes introduced that resulted in incompatibility in java serialization. Thus one + * cannot read state in 1.8+ from snapshot written with previous versions of Flink. + * + * Moreover this serializer is meant to be dropped once we migrate to the new planner + * implementation. + */ + @deprecated class CRowSerializerConfigSnapshot(rowSerializers: Array[TypeSerializer[Row]]) extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) { @@ -123,6 +105,16 @@ object CRowSerializer { } override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION + + override def resolveSchemaCompatibility(newSerializer: TypeSerializer[CRow]) + : TypeSerializerSchemaCompatibility[CRow] = { + + CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new CRowSerializerSnapshot(), + getSingleNestedSerializerAndConfig.f1 + ) + } } object CRowSerializerConfigSnapshot {