1996fanrui commented on code in PR #19723: URL: https://github.com/apache/flink/pull/19723#discussion_r876521519
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial inflightBuffers.toArray(new Buffer[0])); } } - return numPriorityElements == 1 - && !isBlocked; // if subpartition is blocked then downstream doesn't expect any - // notifications + return needNotifyPriorityEvent(); + } + + // It just be called after add priorityEvent. + private boolean needNotifyPriorityEvent() { + assert Thread.holdsLock(buffers); + // if subpartition is blocked then downstream doesn't expect any notifications + return buffers.getNumPriorityElements() == 1 && !isBlocked; + } + + private void receiveTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + checkState( + !channelStateFutures.containsKey(barrier.getId()), + "%s has received the checkpoint barrier %d, it maybe a bug.", + toString(), + barrier.getId()); + + checkChannelStateFutures(barrier.getId()); + CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>(); + channelStateFutures.put(barrier.getId(), dataFuture); + channelStateWriter.addOutputDataFuture( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + dataFuture); + } + + private void checkChannelStateFutures(long currentCheckpointId) { + assert Thread.holdsLock(buffers); + + while (!channelStateFutures.isEmpty()) { + Long checkpointId = channelStateFutures.firstKey(); + if (checkpointId >= currentCheckpointId) { + break; + } + String exceptionMessage = + String.format( + "Received the barrier of checkpointId=%d, complete checkpointId=%d " + + "future by exception due to currently does not support " + + "concurrent unaligned checkpoints.", + currentCheckpointId, checkpointId); + channelStateFutures + .pollFirstEntry() + .getValue() + .completeExceptionally(new IllegalStateException(exceptionMessage)); + LOG.info(exceptionMessage); + } + } Review Comment: You are right. I use `TreeMap` due to `ChannelStateWriteRequestDispatcherImpl` uses `Map<Long, ChannelStateCheckpointWriter> writers;`. Currently, Flink does not support concurrent unaligned checkpoints. I guess it might be supported in the future. Please help to double check, if you think it doesn't need to be considered, I can simplify TreeMap to single CompletableFuture<...> channelStateFuture. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org