This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07c763d12a9e9f76ef00049cb53dd57217962ac5 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Wed Feb 27 12:45:29 2019 +0800 [FLINK-11755] [core] Drop ensureCompatibility from TypeSerializer If users are still using the TypeSerializerConfigSnapshot when upgrading to 1.8, they have to do the following: - Change the type serializer's config snapshot to implement TypeSerializerSnapshot, rather than extending TypeSerializerConfigSnapshot (as previously). - If the above step was completed, then the upgrade is done. - Otherwise, if changing to implement TypeSerializerSnapshot directly in-place as the same class isn't possible (perhaps because the new snapshot is intended to have completely different written contents or intended to have a different class name), retain the old serializer snapshot class (extending TypeSerializerConfigSnapshot) under the same name and give the updated serializer snapshot class (the one extending TypeSerializerSnapshot) a new name. - Override the TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer) method to perform the compatibility check based on configuration written by the old serializer snapshot class. This closes #7821. --- .../flink/api/common/typeutils/TypeSerializer.java | 51 ++++------------------ .../typeutils/TypeSerializerConfigSnapshot.java | 23 +++++++--- 2 files changed, 25 insertions(+), 49 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index 80b28ae..f0036c4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -36,27 +36,20 @@ import java.io.Serializable; * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b> * * <p>This section is relevant if you implemented a TypeSerializer in Flink versions up to 1.6 and want - * to adapt that implementation to the new interfaces that support proper state schema evolution. Please - * follow these steps: + * to adapt that implementation to the new interfaces that support proper state schema evolution, while maintaining + * backwards compatibility. Please follow these steps: * * <ul> * <li>Change the type serializer's config snapshot to implement {@link TypeSerializerSnapshot}, rather * than extending {@code TypeSerializerConfigSnapshot} (as previously). - * <li>Move the compatibility check from the {@link TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)} - * method to the {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)} method. - * </ul> - * - * <p><b>Maintaining Backwards Compatibility</b> - * - * <p>If you want your serializer to be able to restore checkpoints from Flink 1.6 and before, add the steps - * below in addition to the steps above. - * - * <ul> - * <li>Retain the old serializer snapshot class (extending {@code TypeSerializerConfigSnapshot}) under + * <li>If the above step was completed, then the upgrade is done. Otherwise, if changing to implement + * {@link TypeSerializerSnapshot} directly in-place as the same class isn't possible (perhaps because the new snapshot + * is intended to have completely different written contents or intended to have a different class name), + * retain the old serializer snapshot class (extending {@code TypeSerializerConfigSnapshot}) under * the same name and give the updated serializer snapshot class (the one extending {@code TypeSerializerSnapshot}) * a new name. - * <li>Keep the {@link TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)} on the TypeSerializer - * as well. + * <li>Override the {@code TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)} + * method to perform the compatibility check based on configuration written by the old serializer snapshot class. * </ul> * * @param <T> The data type that the serializer serializes. @@ -203,32 +196,4 @@ public abstract class TypeSerializer<T> implements Serializable { * @return snapshot of the serializer's current configuration (cannot be {@code null}). */ public abstract TypeSerializerSnapshot<T> snapshotConfiguration(); - - // -------------------------------------------------------------------------------------------- - // Deprecated methods for backwards compatibility - // -------------------------------------------------------------------------------------------- - - /** - * This method is deprecated. It used to resolved compatibility of the serializer with serializer - * config snapshots in checkpoints. The responsibility for this has moved to - * {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}. - * - * <p>New serializers should not override this method any more! Serializers implemented against Flink - * versions up to 1.6 should still work, but should adjust to new model to enable state evolution and - * be future-proof. See the class-level comments, section <i>"Upgrading TypeSerializers to the new - * TypeSerializerSnapshot model"</i> for details. - * - * @deprecated Replaced by {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}. - */ - @Deprecated - public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - throw new UnsupportedOperationException( - "This method is not supported any more - please evolve your TypeSerializer the following way:\n\n" + - " - If you have a serializer whose 'ensureCompatibility()' method delegates to another\n" + - " serializer's 'ensureCompatibility()', please use" + - "'CompatibilityUtil.resolveCompatibilityResult(snapshot, this)' instead.\n\n" + - " - If you updated your serializer (removed overriding the 'ensureCompatibility()' method),\n" + - " please also update the corresponding config snapshot to not extend 'TypeSerializerConfigSnapshot'" + - "any more.\n\n"); - } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java index ee64748..f09b455 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java @@ -151,13 +151,24 @@ public abstract class TypeSerializerConfigSnapshot<T> extends VersionedIOReadabl SelfResolvingTypeSerializer<T> selfResolvingTypeSerializer = (SelfResolvingTypeSerializer<T>) newSerializer; return selfResolvingTypeSerializer.resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(this); } - // in prior versions, the compatibility check was in the serializer itself, so we - // delegate this call to the serializer. - final CompatibilityResult<T> compatibility = newSerializer.ensureCompatibility(this); - return compatibility.isRequiresMigration() ? - TypeSerializerSchemaCompatibility.incompatible() : - TypeSerializerSchemaCompatibility.compatibleAsIs(); + // we reach here if: + // - this legacy config snapshot did not override #resolveSchemaCompatibility to redirect + // the compatibility check to a new TypeSerializerSnapshot + // - the corresponding newSerializer does not make use of the SelfResolvingTypeSerializer + // to assist with the redirection + throw new UnsupportedOperationException( + "Serializer snapshot " + getClass().getName() + " is still implementing the deprecated TypeSerializerConfigSnapshot class.\n" + + "Please update it to implement the TypeSerializerSnapshot interface, to enable state evolution as well as being future-proof.\n\n" + + "- If possible, you should try to perform the update in-place, i.e. use the same snapshot class under the same name, but change it to implement TypeSerializerSnapshot instead.\n\n" + + "- Otherwise, if the above isn't possible (perhaps because the new snapshot is intended to have completely\n" + + " different written contents or intended to have a different class name),\n" + + " retain the old serializer snapshot class (extending TypeSerializerConfigSnapshot) under the same name\n" + + " and give the updated serializer snapshot class (the one extending TypeSerializerSnapshot) a new name.\n" + + " Afterwards, override the TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)\n" + + " method on the old snapshot to perform the compatibility check based on configuration written by" + + " the old serializer snapshot class." + ); } /**