1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1427479694


##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##########
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
      * program's serializer re-serializes the data, thus converting the format 
during the restore
      * operation.
      *
+     * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+     *     #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
      * @param newSerializer the new serializer to check.
      * @return the serializer compatibility result.
      */
-    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
-            TypeSerializer<T> newSerializer);
+    @Deprecated
+    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+            TypeSerializer<T> newSerializer) {
+        return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+    }
+
+    /**
+     * Checks current serializer's compatibility to read data written by the 
prior serializer.
+     *
+     * <p>When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+     * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+     * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+     * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+     * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+     * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+     * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+     * program's serializer re-serializes the data, thus converting the format 
during the restore
+     * operation.
+     *
+     * <p>This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+     * details.
+     *
+     * @param oldSerializerSnapshot the old serializer snapshot to check.
+     * @return the serializer compatibility result.
+     */
+    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+            TypeSerializerSnapshot<T> oldSerializerSnapshot) {
+        return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   After thinking more about it, I'm wondering if we don't need a compatibility 
implementation for the old `resolveSchemaCompatibility(TypeSerializer)` method.
   
   IIUC, Flink just calls the new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future, 
right?
   
   If so, we can add a default implementation for the old 
`resolveSchemaCompatibility(TypeSerializer)` method, the default implementation 
throwes `UnsupportedException`. We need to add a default implementation because 
users or developers don't need to implement it in the future.
   
   For the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method, you 
current implementation is fine. All old implementation class can be supported 
if `resolveSchemaCompatibility(TypeSerializerSnapshot)` calls the old one.
   
   BTW, the code comment should guide users or developers to implement the new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to