1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1422192659
########## flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java: ########## @@ -1244,8 +1244,8 @@ void testStateMigrationAfterChangingTTLFromDisablingToEnabling() { testKeyedValueStateUpgrade( initialAccessDescriptor, newAccessDescriptorAfterRestore)) .satisfiesAnyOf( - e -> assertThat(e).isInstanceOf(IllegalStateException.class), - e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class)); + e -> assertThat(e).isInstanceOf(StateMigrationException.class), + e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } Review Comment: Would you mind adding one test to check all state backends call the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` instead of old method? ########## flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java: ########## @@ -289,6 +292,38 @@ protected void readOuterSnapshot( int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {} + /** + * Checks the schema compatibility of the given old serializer snapshot based on the outer + * snapshot. + * + * <p>The base implementation of this method assumes that the outer serializer only has nested + * serializers and no extra information, and therefore the result of the check is {@link + * OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains some + * extra information that has been persisted as part of the serializer snapshot, this must be + * overridden. Note that this method and the corresponding methods {@link + * #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, + * ClassLoader)} needs to be implemented. + * + * @param oldSerializerSnapshot the old serializer snapshot, which contains the old outer + * information to check against. + * @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer Review Comment: ```suggestion * @return a {@link OuterSchemaCompatibility} indicating whether the new serializer's outer ``` ########## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ########## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ - TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( - TypeSerializer<T> newSerializer); + @Deprecated + default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( + TypeSerializer<T> newSerializer) { + return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); + } + + /** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * <p>This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ + default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( + TypeSerializerSnapshot<T> oldSerializerSnapshot) { + return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); Review Comment: I'm not sure should we add a default implementation for the new `resolveSchemaCompatibility` method. I'm worried that providing a default implementation may not work well in some scenarios. We have 2 `resolveSchemaCompatibility` methods for now, and they call each other by default. This behavior may be a bug, for example: - The `TypeSerializerSnapshot` of `data1` has old `resolveSchemaCompatibility(TypeSerializer)` method. - The `TypeSerializerSnapshot` of `data2` has new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method. When flink users upgrade the data from data2 to data1, and flink will call the `resolveSchemaCompatibility(TypeSerializerSnapshot)` of data1, it will call the default implementation, so data1 will call the data2's `resolveSchemaCompatibility(TypeSerializer)`, and then it will call the default implementation. So call `resolveSchemaCompatibility(TypeSerializerSnapshot)` of data1 again. It's a dead loop. Based on this, I suggest all types inside of flink project should implement new `resolveSchemaCompatibility(TypeSerializerSnapshot)`. Currently, the `EitherSerializerSnapshot` and `GenericArraySerializerConfigSnapshot` don't implement it. WDYT? Can we not provide a default implementation for the new `resolveSchemaCompatibility(TypeSerializerSnapshot)`? The default implementation is provided because the user may have a custom serializer, right? ########## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ########## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ - TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( - TypeSerializer<T> newSerializer); + @Deprecated + default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( + TypeSerializer<T> newSerializer) { + return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); + } + + /** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * <p>This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ + default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( Review Comment: After this change, should all production callers call the new `resolveSchemaCompatibility` instead of old one? For example: <img width="1366" alt="image" src="https://github.com/apache/flink/assets/38427477/9e21ad30-c2cc-4c0d-8a00-e5eaf082c011"> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org