1996fanrui commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1422192659


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java:
##########
@@ -1244,8 +1244,8 @@ void 
testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
                                 testKeyedValueStateUpgrade(
                                         initialAccessDescriptor, 
newAccessDescriptorAfterRestore))
                 .satisfiesAnyOf(
-                        e -> 
assertThat(e).isInstanceOf(IllegalStateException.class),
-                        e -> 
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
+                        e -> 
assertThat(e).isInstanceOf(StateMigrationException.class),
+                        e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
     }

Review Comment:
   Would you mind adding one test to check all state backends call the new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)` instead of old method?



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java:
##########
@@ -289,6 +292,38 @@ protected void readOuterSnapshot(
             int readOuterSnapshotVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
             throws IOException {}
 
+    /**
+     * Checks the schema compatibility of the given old serializer snapshot 
based on the outer
+     * snapshot.
+     *
+     * <p>The base implementation of this method assumes that the outer 
serializer only has nested
+     * serializers and no extra information, and therefore the result of the 
check is {@link
+     * OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer 
serializer contains some
+     * extra information that has been persisted as part of the serializer 
snapshot, this must be
+     * overridden. Note that this method and the corresponding methods {@link
+     * #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, 
DataInputView,
+     * ClassLoader)} needs to be implemented.
+     *
+     * @param oldSerializerSnapshot the old serializer snapshot, which 
contains the old outer
+     *     information to check against.
+     * @return a {@link OuterSchemaCompatibility} indicating whether or the 
new serializer's outer

Review Comment:
   ```suggestion
        * @return a {@link OuterSchemaCompatibility} indicating whether the new 
serializer's outer
   ```



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##########
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
      * program's serializer re-serializes the data, thus converting the format 
during the restore
      * operation.
      *
+     * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+     *     #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
      * @param newSerializer the new serializer to check.
      * @return the serializer compatibility result.
      */
-    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
-            TypeSerializer<T> newSerializer);
+    @Deprecated
+    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+            TypeSerializer<T> newSerializer) {
+        return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+    }
+
+    /**
+     * Checks current serializer's compatibility to read data written by the 
prior serializer.
+     *
+     * <p>When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+     * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+     * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+     * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+     * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+     * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+     * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+     * program's serializer re-serializes the data, thus converting the format 
during the restore
+     * operation.
+     *
+     * <p>This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+     * details.
+     *
+     * @param oldSerializerSnapshot the old serializer snapshot to check.
+     * @return the serializer compatibility result.
+     */
+    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+            TypeSerializerSnapshot<T> oldSerializerSnapshot) {
+        return 
oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());

Review Comment:
   I'm not sure should we add a default implementation for the new 
`resolveSchemaCompatibility` method. I'm worried that providing a default 
implementation may not work well in some scenarios. We have 2 
`resolveSchemaCompatibility` methods for now, and they call each other by 
default. This behavior may be a bug, for example: 
   - The `TypeSerializerSnapshot` of `data1` has old 
`resolveSchemaCompatibility(TypeSerializer)` method.
   - The `TypeSerializerSnapshot` of `data2` has new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)` method.
   
   When flink users upgrade the data from data2 to data1, and flink will call 
the `resolveSchemaCompatibility(TypeSerializerSnapshot)` of data1, it will call 
the default implementation, so data1 will call the data2's 
`resolveSchemaCompatibility(TypeSerializer)`, and then it will call the default 
implementation. So call `resolveSchemaCompatibility(TypeSerializerSnapshot)` of 
data1 again.
   
   It's a dead loop.
   
   Based on this, I suggest all types inside of flink project should implement 
new `resolveSchemaCompatibility(TypeSerializerSnapshot)`. Currently, the 
`EitherSerializerSnapshot` and `GenericArraySerializerConfigSnapshot` don't 
implement it. WDYT?
   
   Can we not provide a default implementation for the new 
`resolveSchemaCompatibility(TypeSerializerSnapshot)`? The default 
implementation is provided because the user may have a custom serializer, right?



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java:
##########
@@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLo
      * program's serializer re-serializes the data, thus converting the format 
during the restore
      * operation.
      *
+     * @deprecated This method has been replaced by {@link 
TypeSerializerSnapshot
+     *     #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
      * @param newSerializer the new serializer to check.
      * @return the serializer compatibility result.
      */
-    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
-            TypeSerializer<T> newSerializer);
+    @Deprecated
+    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
+            TypeSerializer<T> newSerializer) {
+        return 
newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
+    }
+
+    /**
+     * Checks current serializer's compatibility to read data written by the 
prior serializer.
+     *
+     * <p>When a checkpoint/savepoint is restored, this method checks whether 
the serialization
+     * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer
+     * used by the program that restores the checkpoint/savepoint. The outcome 
can be that the
+     * serialization format is compatible, that the program's serializer needs 
to reconfigure itself
+     * (meaning to incorporate some information from the 
TypeSerializerSnapshot to be compatible),
+     * that the format is outright incompatible, or that a migration needed. 
In the latter case, the
+     * TypeSerializerSnapshot produces a serializer to deserialize the data, 
and the restoring
+     * program's serializer re-serializes the data, thus converting the format 
during the restore
+     * operation.
+     *
+     * <p>This method must be implemented to clarify the compatibility. See 
FLIP-263 for more
+     * details.
+     *
+     * @param oldSerializerSnapshot the old serializer snapshot to check.
+     * @return the serializer compatibility result.
+     */
+    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(

Review Comment:
   After this change, should all production callers call the new 
`resolveSchemaCompatibility` instead of old one?
   
   For example:
   
   <img width="1366" alt="image" 
src="https://github.com/apache/flink/assets/38427477/9e21ad30-c2cc-4c0d-8a00-e5eaf082c011";>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to