1996fanrui commented on code in PR #20137:
URL: https://github.com/apache/flink/pull/20137#discussion_r1032795523


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -125,11 +162,72 @@ public void start() throws IllegalStateException {
 
     @Override
     public void submit(ChannelStateWriteRequest request) throws Exception {
-        submitInternal(request, () -> deque.add(request));
+        BlockingQueue<ChannelStateWriteRequest> unreadyQueue =
+                unreadyQueues.get(
+                        SubtaskID.of(
+                                request.getJobID(),
+                                request.getJobVertexID(),
+                                request.getSubtaskIndex()));
+        checkState(unreadyQueue != null, "The subtask %s is not yet 
registered");
+        submitInternal(
+                request,
+                () -> {
+                    synchronized (unreadyQueue) {
+                        // 1. unreadyQueue isn't empty, the new request must 
keep the order, so add
+                        // the new request to the unreadyQueue tail.
+                        if (!unreadyQueue.isEmpty()) {
+                            unreadyQueue.add(request);
+                            return;
+                        }
+                        // 2. unreadyQueue is empty, and new request is ready, 
so add it to the
+                        // readyQueue.
+                        if (request.getReadyFuture().isDone()) {
+                            deque.add(request);
+                            return;
+                        }
+                        // 3. unreadyQueue is empty, and new request isn't 
ready, so add it to the
+                        // readyQueue,
+                        // and register it as the first request.
+                        unreadyQueue.add(request);
+                        registerFirstRequestFuture(request, unreadyQueue);
+                    }
+                });
+    }
+
+    private void registerFirstRequestFuture(
+            @Nonnull ChannelStateWriteRequest firstRequest,
+            BlockingQueue<ChannelStateWriteRequest> unreadyQueue) {
+        assert Thread.holdsLock(unreadyQueue);
+        checkState(firstRequest == unreadyQueue.peek(), "The request isn't the 
first request.");
+        firstRequest
+                .getReadyFuture()
+                .thenAccept(o -> moveReadyRequestToReadyQueue(unreadyQueue, 
firstRequest))
+                .exceptionally(
+                        throwable -> {
+                            moveReadyRequestToReadyQueue(unreadyQueue, 
firstRequest);
+                            return null;
+                        });

Review Comment:
   When dataFuture is completed, just move the request to readyQueue. 
   
   And if the dataFuture isCompletedExceptionally, the `writer.fail` will be 
called later. You can take a look 
`ChannelStateWriteRequest#buildFutureWriteRequest.`, so I don't think the 
exception needs be handled here.
   
   ```
   static ChannelStateWriteRequest buildFutureWriteRequest(
           JobID jobID,
           JobVertexID jobVertexID,
           int subtaskIndex,
           long checkpointId,
           String name,
           CompletableFuture<List<Buffer>> dataFuture,
           BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
       return new CheckpointInProgressRequest(
               name,
               jobID,
               jobVertexID,
               subtaskIndex,
               checkpointId,
               writer -> {
                   checkState(
                           dataFuture.isDone(), "It should be executed when 
dataFuture is done.");
                   List<Buffer> buffers;
                   try {
                       buffers = dataFuture.get();
                   } catch (ExecutionException e) {
                       // If dataFuture fails, fail only the single related 
writer
                       writer.fail(jobID, jobVertexID, subtaskIndex, e);
                       return;
                   }
                   for (Buffer buffer : buffers) {
                       checkBufferIsBuffer(buffer);
                       bufferConsumer.accept(writer, buffer);
                   }
               },
               throwable ->
                       dataFuture.thenAccept(
                               buffers -> {
                                   try {
                                       CloseableIterator.fromList(buffers, 
Buffer::recycleBuffer)
                                               .close();
                                   } catch (Exception e) {
                                       LOG.error(
                                               "Failed to recycle the output 
buffer of channel state.",
                                               e);
                                   }
                               }),
               dataFuture);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to