1996fanrui commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1427479694
########## flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java: ########## @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo * program's serializer re-serializes the data, thus converting the format during the restore * operation. * + * @deprecated This method has been replaced by {@link TypeSerializerSnapshot + * #resolveSchemaCompatibility(TypeSerializerSnapshot)}. * @param newSerializer the new serializer to check. * @return the serializer compatibility result. */ - TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( - TypeSerializer<T> newSerializer); + @Deprecated + default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( + TypeSerializer<T> newSerializer) { + return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this); + } + + /** + * Checks current serializer's compatibility to read data written by the prior serializer. + * + * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization + * format of the data in the checkpoint/savepoint is compatible for the format of the serializer + * used by the program that restores the checkpoint/savepoint. The outcome can be that the + * serialization format is compatible, that the program's serializer needs to reconfigure itself + * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible), + * that the format is outright incompatible, or that a migration needed. In the latter case, the + * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring + * program's serializer re-serializes the data, thus converting the format during the restore + * operation. + * + * <p>This method must be implemented to clarify the compatibility. See FLIP-263 for more + * details. + * + * @param oldSerializerSnapshot the old serializer snapshot to check. + * @return the serializer compatibility result. + */ + default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( + TypeSerializerSnapshot<T> oldSerializerSnapshot) { + return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer()); Review Comment: After thinking more about it, I'm wondering if we don't need a compatibility implementation for the old `resolveSchemaCompatibility(TypeSerializer)` method. IIUC, Flink just calls the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future, right? If so, we can add a default implementation for the old `resolveSchemaCompatibility(TypeSerializer)` method, the default implementation throwes `UnsupportedException`. We need to add a default implementation because users or developers don't need to implement it in the future. For the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method, you current implementation is fine. All old implementation class can be supported if `resolveSchemaCompatibility(TypeSerializerSnapshot)` calls the old one. BTW, the code comment should guide users or developers to implement the new `resolveSchemaCompatibility(TypeSerializerSnapshot)` method in the future. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org