1996fanrui commented on code in PR #20137: URL: https://github.com/apache/flink/pull/20137#discussion_r1032795523
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java: ########## @@ -125,11 +162,72 @@ public void start() throws IllegalStateException { @Override public void submit(ChannelStateWriteRequest request) throws Exception { - submitInternal(request, () -> deque.add(request)); + BlockingQueue<ChannelStateWriteRequest> unreadyQueue = + unreadyQueues.get( + SubtaskID.of( + request.getJobID(), + request.getJobVertexID(), + request.getSubtaskIndex())); + checkState(unreadyQueue != null, "The subtask %s is not yet registered"); + submitInternal( + request, + () -> { + synchronized (unreadyQueue) { + // 1. unreadyQueue isn't empty, the new request must keep the order, so add + // the new request to the unreadyQueue tail. + if (!unreadyQueue.isEmpty()) { + unreadyQueue.add(request); + return; + } + // 2. unreadyQueue is empty, and new request is ready, so add it to the + // readyQueue. + if (request.getReadyFuture().isDone()) { + deque.add(request); + return; + } + // 3. unreadyQueue is empty, and new request isn't ready, so add it to the + // readyQueue, + // and register it as the first request. + unreadyQueue.add(request); + registerFirstRequestFuture(request, unreadyQueue); + } + }); + } + + private void registerFirstRequestFuture( + @Nonnull ChannelStateWriteRequest firstRequest, + BlockingQueue<ChannelStateWriteRequest> unreadyQueue) { + assert Thread.holdsLock(unreadyQueue); + checkState(firstRequest == unreadyQueue.peek(), "The request isn't the first request."); + firstRequest + .getReadyFuture() + .thenAccept(o -> moveReadyRequestToReadyQueue(unreadyQueue, firstRequest)) + .exceptionally( + throwable -> { + moveReadyRequestToReadyQueue(unreadyQueue, firstRequest); + return null; + }); Review Comment: When dataFuture is completed, just move the request to readyQueue. And if the dataFuture isCompletedExceptionally, the `writer.fail` will be called later. You can take a look `ChannelStateWriteRequest#buildFutureWriteRequest.`, so I don't think the exception needs be handled here. ``` static ChannelStateWriteRequest buildFutureWriteRequest( JobID jobID, JobVertexID jobVertexID, int subtaskIndex, long checkpointId, String name, CompletableFuture<List<Buffer>> dataFuture, BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) { return new CheckpointInProgressRequest( name, jobID, jobVertexID, subtaskIndex, checkpointId, writer -> { checkState( dataFuture.isDone(), "It should be executed when dataFuture is done."); List<Buffer> buffers; try { buffers = dataFuture.get(); } catch (ExecutionException e) { // If dataFuture fails, fail only the single related writer writer.fail(jobID, jobVertexID, subtaskIndex, e); return; } for (Buffer buffer : buffers) { checkBufferIsBuffer(buffer); bufferConsumer.accept(writer, buffer); } }, throwable -> dataFuture.thenAccept( buffers -> { try { CloseableIterator.fromList(buffers, Buffer::recycleBuffer) .close(); } catch (Exception e) { LOG.error( "Failed to recycle the output buffer of channel state.", e); } }), dataFuture); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org