Myasuka commented on a change in pull request #18391:
URL: https://github.com/apache/flink/pull/18391#discussion_r803672817



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -298,19 +306,24 @@ void serializeKeyedStateHandle(KeyedStateHandle 
stateHandle, DataOutputStream do
             if (stateHandle instanceof KeyGroupsSavepointStateHandle) {
                 dos.writeByte(SAVEPOINT_KEY_GROUPS_HANDLE);
             } else {
-                dos.writeByte(KEY_GROUPS_HANDLE);
+                dos.writeByte(KEY_GROUPS_HANDLE_V2);
             }
             
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
             
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
             for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
                 
dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
             }
             
serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
+
+            // savepoint state handle would not need to persist state handle 
id out.
+            if (!(stateHandle instanceof KeyGroupsSavepointStateHandle)) {
+                dos.writeUTF(stateHandle.getStateHandleId().toString());
+            }

Review comment:
       I see, another reason that I don't want to write the state handle id is 
because that I have to introduce another `SAVEPOINT_KEY_GROUPS_HANDLE_V2`. I 
just don't want introduce to much V2 version flag to disable restoring 
savepoint of Flink-1.15 to Flink-1.14 (forward compatibilit), although this is 
not offcially supported.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to