pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012587547


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) 
throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws 
Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), 
buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, 
ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has 
uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new 
checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not 
supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   I was suggesting `abortedCheckpointId` (or maybe `boolean isAborted`?) 
instead of `maxAbortedCheckpointId` to simplify the code a bit. 
`SubtaskCheckpointCoordinator` should already take care of aborted checkpoints 
in advance. For example if the current checkpoint is `42`, and that's the 
checkpoint for which `ChannelStateWriter` is persisting data and 
`notifyCheckpointAborted(44)` arrives, `SubtaskCheckpointCoordinator` will 
ensure that `ChannelStateWriter#start(43)` and `ChannelStateWriter#start(44)` 
will never be called. So there is no need to duplicate this logic in 
`ChannelStateWriter`.
   
   But that's probably a small difference for the simplicity of the code. So I 
would be fine either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to