pnowojski commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876798168


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void receiveTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        checkState(
+                !channelStateFutures.containsKey(barrier.getId()),
+                "%s has received the checkpoint barrier %d, it maybe a bug.",
+                toString(),
+                barrier.getId());
+
+        checkChannelStateFutures(barrier.getId());
+        CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>();
+        channelStateFutures.put(barrier.getId(), dataFuture);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                dataFuture);
+    }
+
+    private void checkChannelStateFutures(long currentCheckpointId) {
+        assert Thread.holdsLock(buffers);
+
+        while (!channelStateFutures.isEmpty()) {
+            Long checkpointId = channelStateFutures.firstKey();
+            if (checkpointId >= currentCheckpointId) {
+                break;
+            }
+            String exceptionMessage =
+                    String.format(
+                            "Received the barrier of checkpointId=%d, complete 
checkpointId=%d "
+                                    + "future by exception due to currently 
does not support "
+                                    + "concurrent unaligned checkpoints.",
+                            currentCheckpointId, checkpointId);
+            channelStateFutures
+                    .pollFirstEntry()
+                    .getValue()
+                    .completeExceptionally(new 
IllegalStateException(exceptionMessage));
+            LOG.info(exceptionMessage);
+        }
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        if (channelStateFutures.isEmpty()) {
+            return;
+        }
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+
+        CompletableFuture<List<Buffer>> channelStateFuture =
+                channelStateFutures.remove(barrier.getId());
+        if (channelStateFuture == null) {
+            return;
+        }
+        channelStateFuture.complete(null);
+    }
+
+    private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier(
+            BufferConsumer bufferConsumer) {
+        CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer);
+        checkArgument(barrier != null, "Parse the timeoutable Checkpoint 
Barrier failed.");
+        checkState(
+                barrier.getCheckpointOptions().isTimeoutable()
+                        && 
Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                                == bufferConsumer.getDataType());
+        return barrier;
+    }
+
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
+        synchronized (buffers) {
+            CompletableFuture<List<Buffer>> channelStateFuture =
+                    channelStateFutures.remove(checkpointId);
+            // The checkpoint barrier has sent to downstream, so nothing to do.
+            if (channelStateFuture == null) {
+                return;
+            }
+
+            // 1. find inflightBuffers and timeout the aligned barrier to 
unaligned barrier
+            List<Buffer> inflightBuffers = new ArrayList<>();
+            try {
+                if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, 
inflightBuffers)) {
+                    prioritySequenceNumber = sequenceNumber;
+                }
+            } catch (IOException e) {
+                channelStateFuture.completeExceptionally(
+                        new IllegalStateException(

Review Comment:
   > Do you mean that completeExceptionally(e) directly?
   
   yes, unless there is a good reason for wrapping into `IllegalStateException` 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to