This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07c763d12a9e9f76ef00049cb53dd57217962ac5
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Wed Feb 27 12:45:29 2019 +0800

    [FLINK-11755] [core] Drop ensureCompatibility from TypeSerializer
    
    If users are still using the TypeSerializerConfigSnapshot when upgrading
    to 1.8, they have to do the following:
    
    - Change the type serializer's config snapshot to implement
      TypeSerializerSnapshot, rather than extending
      TypeSerializerConfigSnapshot (as previously).
    - If the above step was completed, then the upgrade is done.
    - Otherwise, if changing to implement TypeSerializerSnapshot directly
      in-place as the same class isn't possible (perhaps because the new
      snapshot is intended to have completely different written contents or
      intended to have a different class name),
      retain the old serializer snapshot class (extending
      TypeSerializerConfigSnapshot) under the same name and give the updated
      serializer snapshot class (the one extending TypeSerializerSnapshot)
      a new name.
    - Override the
      TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
      method to perform the compatibility check based on configuration
      written by the old serializer snapshot class.
    
    This closes #7821.
---
 .../flink/api/common/typeutils/TypeSerializer.java | 51 ++++------------------
 .../typeutils/TypeSerializerConfigSnapshot.java    | 23 +++++++---
 2 files changed, 25 insertions(+), 49 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 80b28ae..f0036c4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,27 +36,20 @@ import java.io.Serializable;
  * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b>
  *
  * <p>This section is relevant if you implemented a TypeSerializer in Flink 
versions up to 1.6 and want
- * to adapt that implementation to the new interfaces that support proper 
state schema evolution. Please
- * follow these steps:
+ * to adapt that implementation to the new interfaces that support proper 
state schema evolution, while maintaining
+ * backwards compatibility. Please follow these steps:
  *
  * <ul>
  *     <li>Change the type serializer's config snapshot to implement {@link 
TypeSerializerSnapshot}, rather
  *     than extending {@code TypeSerializerConfigSnapshot} (as previously).
- *     <li>Move the compatibility check from the {@link 
TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)}
- *     method to the {@link 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)} method.
- * </ul>
- *
- * <p><b>Maintaining Backwards Compatibility</b>
- *
- * <p>If you want your serializer to be able to restore checkpoints from Flink 
1.6 and before, add the steps
- * below in addition to the steps above.
- *
- * <ul>
- *     <li>Retain the old serializer snapshot class (extending {@code 
TypeSerializerConfigSnapshot}) under
+ *     <li>If the above step was completed, then the upgrade is done. 
Otherwise, if changing to implement
+ *     {@link TypeSerializerSnapshot} directly in-place as the same class 
isn't possible (perhaps because the new snapshot
+ *     is intended to have completely different written contents or intended 
to have a different class name),
+ *     retain the old serializer snapshot class (extending {@code 
TypeSerializerConfigSnapshot}) under
  *     the same name and give the updated serializer snapshot class (the one 
extending {@code TypeSerializerSnapshot})
  *     a new name.
- *     <li>Keep the {@link 
TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)} on the 
TypeSerializer
- *     as well.
+ *     <li>Override the {@code 
TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)}
+ *     method to perform the compatibility check based on configuration 
written by the old serializer snapshot class.
  * </ul>
  *
  * @param <T> The data type that the serializer serializes.
@@ -203,32 +196,4 @@ public abstract class TypeSerializer<T> implements 
Serializable {
         * @return snapshot of the serializer's current configuration (cannot 
be {@code null}).
         */
        public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Deprecated methods for backwards compatibility
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * This method is deprecated. It used to resolved compatibility of the 
serializer with serializer
-        * config snapshots in checkpoints. The responsibility for this has 
moved to
-        * {@link 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
-        *
-        * <p>New serializers should not override this method any more! 
Serializers implemented against Flink
-        * versions up to 1.6 should still work, but should adjust to new model 
to enable state evolution and
-        * be future-proof. See the class-level comments, section <i>"Upgrading 
TypeSerializers to the new
-        * TypeSerializerSnapshot model"</i> for details.
-        *
-        * @deprecated Replaced by {@link 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
-        */
-       @Deprecated
-       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               throw new UnsupportedOperationException(
-                               "This method is not supported any more - please 
evolve your TypeSerializer the following way:\n\n" +
-                               "  - If you have a serializer whose 
'ensureCompatibility()' method delegates to another\n" +
-                               "    serializer's 'ensureCompatibility()', 
please use" +
-                                               
"'CompatibilityUtil.resolveCompatibilityResult(snapshot, this)' instead.\n\n" +
-                               "  - If you updated your serializer (removed 
overriding the 'ensureCompatibility()' method),\n" +
-                               "    please also update the corresponding 
config snapshot to not extend 'TypeSerializerConfigSnapshot'" +
-                                               "any more.\n\n");
-       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index ee64748..f09b455 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -151,13 +151,24 @@ public abstract class TypeSerializerConfigSnapshot<T> 
extends VersionedIOReadabl
                                SelfResolvingTypeSerializer<T> 
selfResolvingTypeSerializer = (SelfResolvingTypeSerializer<T>) newSerializer;
                                return 
selfResolvingTypeSerializer.resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(this);
                }
-               // in prior versions, the compatibility check was in the 
serializer itself, so we
-               // delegate this call to the serializer.
-               final CompatibilityResult<T> compatibility = 
newSerializer.ensureCompatibility(this);
 
-               return compatibility.isRequiresMigration() ?
-                               
TypeSerializerSchemaCompatibility.incompatible() :
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs();
+               // we reach here if:
+               // - this legacy config snapshot did not override 
#resolveSchemaCompatibility to redirect
+               //   the compatibility check to a new TypeSerializerSnapshot
+               // - the corresponding newSerializer does not make use of the 
SelfResolvingTypeSerializer
+               //   to assist with the redirection
+               throw new UnsupportedOperationException(
+                       "Serializer snapshot " + getClass().getName() + " is 
still implementing the deprecated TypeSerializerConfigSnapshot class.\n" +
+                               "Please update it to implement the 
TypeSerializerSnapshot interface, to enable state evolution as well as being 
future-proof.\n\n" +
+                               "- If possible, you should try to perform the 
update in-place, i.e. use the same snapshot class under the same name, but 
change it to implement TypeSerializerSnapshot instead.\n\n" +
+                               "- Otherwise, if the above isn't possible 
(perhaps because the new snapshot is intended to have completely\n" +
+                               "  different written contents or intended to 
have a different class name),\n" +
+                               "  retain the old serializer snapshot class 
(extending TypeSerializerConfigSnapshot) under the same name\n" +
+                               "  and give the updated serializer snapshot 
class (the one extending TypeSerializerSnapshot) a new name.\n" +
+                               "  Afterwards, override the 
TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)\n" +
+                               "  method on the old snapshot to perform the 
compatibility check based on configuration written by" +
+                               "  the old serializer snapshot class."
+               );
        }
 
        /**

Reply via email to