pnowojski commented on code in PR #19723: URL: https://github.com/apache/flink/pull/19723#discussion_r876798168
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial inflightBuffers.toArray(new Buffer[0])); } } - return numPriorityElements == 1 - && !isBlocked; // if subpartition is blocked then downstream doesn't expect any - // notifications + return needNotifyPriorityEvent(); + } + + // It just be called after add priorityEvent. + private boolean needNotifyPriorityEvent() { + assert Thread.holdsLock(buffers); + // if subpartition is blocked then downstream doesn't expect any notifications + return buffers.getNumPriorityElements() == 1 && !isBlocked; + } + + private void receiveTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + checkState( + !channelStateFutures.containsKey(barrier.getId()), + "%s has received the checkpoint barrier %d, it maybe a bug.", + toString(), + barrier.getId()); + + checkChannelStateFutures(barrier.getId()); + CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>(); + channelStateFutures.put(barrier.getId(), dataFuture); + channelStateWriter.addOutputDataFuture( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + dataFuture); + } + + private void checkChannelStateFutures(long currentCheckpointId) { + assert Thread.holdsLock(buffers); + + while (!channelStateFutures.isEmpty()) { + Long checkpointId = channelStateFutures.firstKey(); + if (checkpointId >= currentCheckpointId) { + break; + } + String exceptionMessage = + String.format( + "Received the barrier of checkpointId=%d, complete checkpointId=%d " + + "future by exception due to currently does not support " + + "concurrent unaligned checkpoints.", + currentCheckpointId, checkpointId); + channelStateFutures + .pollFirstEntry() + .getValue() + .completeExceptionally(new IllegalStateException(exceptionMessage)); + LOG.info(exceptionMessage); + } + } + + private void completeTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + if (channelStateFutures.isEmpty()) { + return; + } + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(barrier.getId()); + if (channelStateFuture == null) { + return; + } + channelStateFuture.complete(null); + } + + private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier( + BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer); + checkArgument(barrier != null, "Parse the timeoutable Checkpoint Barrier failed."); + checkState( + barrier.getCheckpointOptions().isTimeoutable() + && Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()); + return barrier; + } + + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER; + synchronized (buffers) { + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(checkpointId); + // The checkpoint barrier has sent to downstream, so nothing to do. + if (channelStateFuture == null) { + return; + } + + // 1. find inflightBuffers and timeout the aligned barrier to unaligned barrier + List<Buffer> inflightBuffers = new ArrayList<>(); + try { + if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, inflightBuffers)) { + prioritySequenceNumber = sequenceNumber; + } + } catch (IOException e) { + channelStateFuture.completeExceptionally( + new IllegalStateException( Review Comment: > Do you mean that completeExceptionally(e) directly? yes, unless there is a good reason for wrapping into `IllegalStateException` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org