pnowojski commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r875761107


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -72,6 +74,51 @@ static ChannelStateWriteRequest write(
                 (writer, buffer) -> writer.writeOutput(info, buffer));
     }
 
+    static ChannelStateWriteRequest write(
+            long checkpointId,
+            ResultSubpartitionInfo info,
+            CompletableFuture<List<Buffer>> dataFuture) {
+        return buildFutureWriteRequest(
+                checkpointId,
+                "writeOutputFuture",
+                dataFuture,
+                (writer, buffer) -> writer.writeOutput(info, buffer));
+    }
+
+    static ChannelStateWriteRequest buildFutureWriteRequest(
+            long checkpointId,
+            String name,
+            CompletableFuture<List<Buffer>> dataFuture,
+            BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
+        return new CheckpointInProgressRequest(
+                name,
+                checkpointId,
+                writer -> {
+                    try {
+                        List<Buffer> buffers = dataFuture.get();
+                        if (buffers == null || buffers.isEmpty()) {

Review Comment:
   nit: maybe we shouldn't support null values here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -72,6 +74,51 @@ static ChannelStateWriteRequest write(
                 (writer, buffer) -> writer.writeOutput(info, buffer));
     }
 
+    static ChannelStateWriteRequest write(
+            long checkpointId,
+            ResultSubpartitionInfo info,
+            CompletableFuture<List<Buffer>> dataFuture) {
+        return buildFutureWriteRequest(
+                checkpointId,
+                "writeOutputFuture",
+                dataFuture,
+                (writer, buffer) -> writer.writeOutput(info, buffer));
+    }
+
+    static ChannelStateWriteRequest buildFutureWriteRequest(
+            long checkpointId,
+            String name,
+            CompletableFuture<List<Buffer>> dataFuture,
+            BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
+        return new CheckpointInProgressRequest(
+                name,
+                checkpointId,
+                writer -> {
+                    try {
+                        List<Buffer> buffers = dataFuture.get();
+                        if (buffers == null || buffers.isEmpty()) {
+                            return;
+                        }
+                        for (Buffer buffer : buffers) {
+                            checkBufferIsBuffer(buffer);
+                            bufferConsumer.accept(writer, buffer);
+                        }
+                    } catch (Throwable e) {
+                        writer.fail(e);
+                    }
+                },
+                throwable -> {
+                    List<Buffer> buffers = dataFuture.get();
+                    if (buffers == null || buffers.isEmpty()) {
+                        return;
+                    }

Review Comment:
   nit: without support for `null`, we could skip this check and just always 
create `CloseableIterator` even if the list is empty.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void receiveTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        checkState(
+                !channelStateFutures.containsKey(barrier.getId()),
+                "%s has received the checkpoint barrier %d, it maybe a bug.",
+                toString(),
+                barrier.getId());
+
+        checkChannelStateFutures(barrier.getId());
+        CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>();
+        channelStateFutures.put(barrier.getId(), dataFuture);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                dataFuture);
+    }
+
+    private void checkChannelStateFutures(long currentCheckpointId) {
+        assert Thread.holdsLock(buffers);
+
+        while (!channelStateFutures.isEmpty()) {
+            Long checkpointId = channelStateFutures.firstKey();
+            if (checkpointId >= currentCheckpointId) {
+                break;
+            }
+            String exceptionMessage =
+                    String.format(
+                            "Received the barrier of checkpointId=%d, complete 
checkpointId=%d "
+                                    + "future by exception due to currently 
does not support "
+                                    + "concurrent unaligned checkpoints.",
+                            currentCheckpointId, checkpointId);
+            channelStateFutures
+                    .pollFirstEntry()
+                    .getValue()
+                    .completeExceptionally(new 
IllegalStateException(exceptionMessage));
+            LOG.info(exceptionMessage);
+        }
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        if (channelStateFutures.isEmpty()) {
+            return;
+        }
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+
+        CompletableFuture<List<Buffer>> channelStateFuture =
+                channelStateFutures.remove(barrier.getId());
+        if (channelStateFuture == null) {
+            return;
+        }
+        channelStateFuture.complete(null);
+    }
+
+    private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier(
+            BufferConsumer bufferConsumer) {
+        CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer);
+        checkArgument(barrier != null, "Parse the timeoutable Checkpoint 
Barrier failed.");
+        checkState(
+                barrier.getCheckpointOptions().isTimeoutable()
+                        && 
Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                                == bufferConsumer.getDataType());
+        return barrier;
+    }
+
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
+        synchronized (buffers) {
+            CompletableFuture<List<Buffer>> channelStateFuture =
+                    channelStateFutures.remove(checkpointId);
+            // The checkpoint barrier has sent to downstream, so nothing to do.
+            if (channelStateFuture == null) {
+                return;
+            }
+
+            // 1. find inflightBuffers and timeout the aligned barrier to 
unaligned barrier
+            List<Buffer> inflightBuffers = new ArrayList<>();
+            try {
+                if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, 
inflightBuffers)) {
+                    prioritySequenceNumber = sequenceNumber;
+                }
+            } catch (IOException e) {
+                channelStateFuture.completeExceptionally(
+                        new IllegalStateException(

Review Comment:
   nit: why `IllegalStateException` and not `e`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void receiveTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        checkState(
+                !channelStateFutures.containsKey(barrier.getId()),
+                "%s has received the checkpoint barrier %d, it maybe a bug.",
+                toString(),
+                barrier.getId());
+
+        checkChannelStateFutures(barrier.getId());
+        CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>();
+        channelStateFutures.put(barrier.getId(), dataFuture);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                dataFuture);
+    }
+
+    private void checkChannelStateFutures(long currentCheckpointId) {
+        assert Thread.holdsLock(buffers);
+
+        while (!channelStateFutures.isEmpty()) {
+            Long checkpointId = channelStateFutures.firstKey();
+            if (checkpointId >= currentCheckpointId) {
+                break;
+            }
+            String exceptionMessage =
+                    String.format(
+                            "Received the barrier of checkpointId=%d, complete 
checkpointId=%d "
+                                    + "future by exception due to currently 
does not support "
+                                    + "concurrent unaligned checkpoints.",
+                            currentCheckpointId, checkpointId);
+            channelStateFutures
+                    .pollFirstEntry()
+                    .getValue()
+                    .completeExceptionally(new 
IllegalStateException(exceptionMessage));
+            LOG.info(exceptionMessage);
+        }
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        if (channelStateFutures.isEmpty()) {
+            return;
+        }
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+
+        CompletableFuture<List<Buffer>> channelStateFuture =
+                channelStateFutures.remove(barrier.getId());
+        if (channelStateFuture == null) {
+            return;
+        }
+        channelStateFuture.complete(null);
+    }
+
+    private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier(
+            BufferConsumer bufferConsumer) {
+        CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer);
+        checkArgument(barrier != null, "Parse the timeoutable Checkpoint 
Barrier failed.");
+        checkState(
+                barrier.getCheckpointOptions().isTimeoutable()
+                        && 
Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                                == bufferConsumer.getDataType());
+        return barrier;
+    }
+
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
+        synchronized (buffers) {
+            CompletableFuture<List<Buffer>> channelStateFuture =
+                    channelStateFutures.remove(checkpointId);
+            // The checkpoint barrier has sent to downstream, so nothing to do.
+            if (channelStateFuture == null) {
+                return;
+            }
+
+            // 1. find inflightBuffers and timeout the aligned barrier to 
unaligned barrier
+            List<Buffer> inflightBuffers = new ArrayList<>();
+            try {
+                if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, 
inflightBuffers)) {
+                    prioritySequenceNumber = sequenceNumber;
+                }
+            } catch (IOException e) {
+                channelStateFuture.completeExceptionally(
+                        new IllegalStateException(
+                                "Timeout the aligned barrier to unaligned 
barrier failed.", e));
+                LOG.info("Timeout the aligned barrier to unaligned barrier 
failed.", e);

Review Comment:
   nit: do we need this log message? It doesn't add any new information, right? 
This exception should be caught and logged higher in the call stack anyway.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java:
##########
@@ -347,6 +357,129 @@ public void testProducerFailedException() {
         assertTrue(view.getFailureCause() instanceof CancelTaskException);
     }
 
+    @Test
+    public void testConsumeTimeoutableCheckpointBarrierQuickly() throws 
Exception {
+        PipelinedSubpartition subpartition = createSubpartition();
+        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
+        assertEquals(0, subpartition.getChannelStateFutures().size());
+        assertEquals(0, subpartition.getNumberOfQueuedBuffers());
+
+        // test without data buffer
+        long checkpointId = 5L;
+        subpartition.add(getTimeoutableBarrierBuffer(checkpointId));
+        assertEquals(1, subpartition.getNumberOfQueuedBuffers());
+        assertEquals(1, subpartition.getChannelStateFutures().size());
+        CompletableFuture<List<Buffer>> channelStateFuture =
+                subpartition.getChannelStateFutures().get(checkpointId);
+        assertFalse(channelStateFuture.isDone());
+
+        ResultSubpartition.BufferAndBacklog barrierBuffer = 
subpartition.pollBuffer();
+        assertNotNull(barrierBuffer);
+        assertEquals(
+                Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER,
+                barrierBuffer.buffer().getDataType());
+
+        assertEquals(0, subpartition.getChannelStateFutures().size());
+        assertEquals(0, subpartition.getNumberOfQueuedBuffers());
+        assertNull(channelStateFuture.get());
+
+        // test with data buffer
+        subpartition.resumeConsumption();
+        checkpointId = 6;
+        subpartition.add(createFilledFinishedBufferConsumer(4096));
+        subpartition.add(getTimeoutableBarrierBuffer(checkpointId));
+        assertEquals(1, subpartition.getChannelStateFutures().size());
+        assertEquals(2, subpartition.getNumberOfQueuedBuffers());
+        channelStateFuture = 
subpartition.getChannelStateFutures().get(checkpointId);
+
+        assertNotNull(subpartition.pollBuffer());
+        assertFalse(channelStateFuture.isDone());
+        barrierBuffer = subpartition.pollBuffer();
+        assertNotNull(barrierBuffer);
+        assertEquals(
+                Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER,
+                barrierBuffer.buffer().getDataType());
+
+        assertEquals(0, subpartition.getChannelStateFutures().size());
+        assertEquals(0, subpartition.getNumberOfQueuedBuffers());
+        assertNull(channelStateFuture.get());
+    }
+
+    @Test
+    public void testTimeoutAlignedToUnalignedBarrier() throws Exception {
+        PipelinedSubpartition subpartition = createSubpartition();
+        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
+        assertEquals(0, subpartition.getChannelStateFutures().size());
+        assertEquals(0, subpartition.getNumberOfQueuedBuffers());
+
+        // test without data buffer
+        long checkpointId = 7L;
+        subpartition.add(getTimeoutableBarrierBuffer(checkpointId));
+        assertEquals(1, subpartition.getChannelStateFutures().size());
+        assertEquals(1, subpartition.getNumberOfQueuedBuffers());
+        CompletableFuture<List<Buffer>> channelStateFuture =
+                subpartition.getChannelStateFutures().get(checkpointId);
+        assertFalse(channelStateFuture.isDone());

Review Comment:
   I see those three assertions repeated over and over again. Maybe extract 
them to a helper method 
`assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, 1, 1, 
false);`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -205,6 +217,9 @@ private boolean addBuffer(BufferConsumer bufferConsumer, 
int partialRecordLength
         assert Thread.holdsLock(buffers);
         if (bufferConsumer.getDataType().hasPriority()) {
             return processPriorityBuffer(bufferConsumer, partialRecordLength);
+        } else if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                == bufferConsumer.getDataType()) {
+            receiveTimeoutableCheckpointBarrier(bufferConsumer);

Review Comment:
   nit: rename to `processTimeoutableCheckpointBarrier()` for the sake of 
consistency with `processPriorityBuffer`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -191,9 +205,7 @@ private int add(BufferConsumer bufferConsumer, int 
partialRecordLength, boolean
             newBufferSize = bufferSize;
         }
 
-        if (prioritySequenceNumber != -1) {
-            notifyPriorityEvent(prioritySequenceNumber);
-        }
+        notifyPriorityEvent(prioritySequenceNumber);

Review Comment:
   very nitty nit: for the future it would be great if you could extract such 
preliminary refactoring changes (like moving this if check to 
`notifyPriorityEvent()` or creating `needNotifyPriorityEvent()` to another 
commit. Such changes are super easy to review on it's own, while if they are 
bundled in a larger commit adding some complex feature they start to create a 
noticeable noise.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void receiveTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        checkState(
+                !channelStateFutures.containsKey(barrier.getId()),
+                "%s has received the checkpoint barrier %d, it maybe a bug.",
+                toString(),
+                barrier.getId());
+
+        checkChannelStateFutures(barrier.getId());
+        CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>();
+        channelStateFutures.put(barrier.getId(), dataFuture);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                dataFuture);
+    }
+
+    private void checkChannelStateFutures(long currentCheckpointId) {
+        assert Thread.holdsLock(buffers);
+
+        while (!channelStateFutures.isEmpty()) {
+            Long checkpointId = channelStateFutures.firstKey();
+            if (checkpointId >= currentCheckpointId) {
+                break;
+            }
+            String exceptionMessage =
+                    String.format(
+                            "Received the barrier of checkpointId=%d, complete 
checkpointId=%d "
+                                    + "future by exception due to currently 
does not support "
+                                    + "concurrent unaligned checkpoints.",
+                            currentCheckpointId, checkpointId);
+            channelStateFutures
+                    .pollFirstEntry()
+                    .getValue()
+                    .completeExceptionally(new 
IllegalStateException(exceptionMessage));
+            LOG.info(exceptionMessage);
+        }
+    }

Review Comment:
   Why do we need `channelStateFutures` to be a `TreeMap` if we support only 
one concurrent checkpoint? Couldn't we drop it altogether and have only single 
`CompletableFuture<...> channelStateFuture` for the currently ongoing 
checkpoint and simplify this code a bit?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void receiveTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        checkState(
+                !channelStateFutures.containsKey(barrier.getId()),
+                "%s has received the checkpoint barrier %d, it maybe a bug.",
+                toString(),
+                barrier.getId());
+
+        checkChannelStateFutures(barrier.getId());
+        CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>();
+        channelStateFutures.put(barrier.getId(), dataFuture);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                dataFuture);
+    }
+
+    private void checkChannelStateFutures(long currentCheckpointId) {
+        assert Thread.holdsLock(buffers);
+
+        while (!channelStateFutures.isEmpty()) {
+            Long checkpointId = channelStateFutures.firstKey();
+            if (checkpointId >= currentCheckpointId) {
+                break;
+            }
+            String exceptionMessage =
+                    String.format(
+                            "Received the barrier of checkpointId=%d, complete 
checkpointId=%d "
+                                    + "future by exception due to currently 
does not support "
+                                    + "concurrent unaligned checkpoints.",
+                            currentCheckpointId, checkpointId);
+            channelStateFutures
+                    .pollFirstEntry()
+                    .getValue()
+                    .completeExceptionally(new 
IllegalStateException(exceptionMessage));
+            LOG.info(exceptionMessage);
+        }
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        if (channelStateFutures.isEmpty()) {
+            return;
+        }
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+
+        CompletableFuture<List<Buffer>> channelStateFuture =
+                channelStateFutures.remove(barrier.getId());
+        if (channelStateFuture == null) {
+            return;
+        }
+        channelStateFuture.complete(null);
+    }
+
+    private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier(
+            BufferConsumer bufferConsumer) {
+        CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer);
+        checkArgument(barrier != null, "Parse the timeoutable Checkpoint 
Barrier failed.");
+        checkState(
+                barrier.getCheckpointOptions().isTimeoutable()
+                        && 
Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                                == bufferConsumer.getDataType());
+        return barrier;
+    }
+
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
+        synchronized (buffers) {
+            CompletableFuture<List<Buffer>> channelStateFuture =
+                    channelStateFutures.remove(checkpointId);
+            // The checkpoint barrier has sent to downstream, so nothing to do.
+            if (channelStateFuture == null) {
+                return;
+            }
+
+            // 1. find inflightBuffers and timeout the aligned barrier to 
unaligned barrier
+            List<Buffer> inflightBuffers = new ArrayList<>();
+            try {
+                if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, 
inflightBuffers)) {
+                    prioritySequenceNumber = sequenceNumber;
+                }
+            } catch (IOException e) {
+                channelStateFuture.completeExceptionally(
+                        new IllegalStateException(
+                                "Timeout the aligned barrier to unaligned 
barrier failed.", e));
+                LOG.info("Timeout the aligned barrier to unaligned barrier 
failed.", e);
+                throw e;
+            }
+
+            // 2. complete the channelStateFuture
+            channelStateFuture.complete(inflightBuffers);
+        }
+
+        // 3. notify downstream read barrier, it must be called outside the 
buffers_lock to avoid
+        // the deadlock.
+        notifyPriorityEvent(prioritySequenceNumber);
+    }
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        synchronized (buffers) {
+            CompletableFuture<List<Buffer>> channelStateFuture =
+                    channelStateFutures.remove(checkpointId);
+            // The channelStateFuture has been completed, so nothing to do.
+            if (channelStateFuture == null) {
+                return;
+            }
+            channelStateFuture.completeExceptionally(cause);
+        }
+    }
+
+    private boolean findInflightBuffersAndMakeBarrierToPriority(
+            long checkpointId, List<Buffer> inflightBuffers) throws 
IOException {
+        // 1. record the buffers before barrier as inflightBuffers
+        final int numPriorityElements = buffers.getNumPriorityElements();
+        final Iterator<BufferConsumerWithPartialRecordLength> iterator = 
buffers.iterator();
+        Iterators.advance(iterator, numPriorityElements);
+
+        BufferConsumerWithPartialRecordLength element = null;
+        CheckpointBarrier barrier = null;
+        while (iterator.hasNext()) {
+            BufferConsumerWithPartialRecordLength next = iterator.next();
+            BufferConsumer bufferConsumer = next.getBufferConsumer();
+
+            if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                    == bufferConsumer.getDataType()) {
+                barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+                if (barrier.getId() != checkpointId) {
+                    continue;
+                }

Review Comment:
   shouldn't this be actually a `checkState`? Can this happen? I guess maybe 
for a checkpoint that has been aborted previously? If so, maybe add a comment 
that why we can encounter older checkpoints?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -103,6 +112,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
     @GuardedBy("lock")
     private boolean closed;
 
+    private final BiFunction<Callable<?>, Duration, Cancellable> registerTimer;
+
+    private final Clock clock;
+
+    /** Hold the AlignmentTimer for each checkpointId. */
+    @GuardedBy("lock")
+    private final NavigableMap<Long, Cancellable> alignmentTimers;

Review Comment:
   Again, do we need a map here since we support only single concurrent 
checkpoint?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -294,23 +324,37 @@ public void checkpointState(
             options = options.withUnalignedSupported();
             initInputsCheckpoint(metadata.getCheckpointId(), options);
         }
+        if (options.isTimeoutable()) {
+            // The output buffer may need to be snapshotted, so start the 
channelStateWriter here.
+            channelStateWriter.start(metadata.getCheckpointId(), options);
+            channelStateWriter.finishInput(metadata.getCheckpointId());
+        }

Review Comment:
   Maybe refactor this code a bit. Extract this if check and one above to a 
separate method and inline  `initInputsCheckpoint`? So that we always start 
`channelStateWriter` in one place?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -103,6 +112,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
     @GuardedBy("lock")
     private boolean closed;
 
+    private final BiFunction<Callable<?>, Duration, Cancellable> registerTimer;
+
+    private final Clock clock;
+
+    /** Hold the AlignmentTimer for each checkpointId. */
+    @GuardedBy("lock")

Review Comment:
   why this need to be guarded by `lock`? AFAIK it exists only because of 
`AsyncCheckpointRunnable` calling 
`SubtaskCheckpointCoordinatorImpl#unregisterConsumer`. All other operations 
are/should be from the task thread.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -103,18 +104,21 @@ static ChannelStateWriteRequest buildFutureWriteRequest(
                             checkBufferIsBuffer(buffer);
                             bufferConsumer.accept(writer, buffer);
                         }
-                    } catch (Throwable e) {
+                    } catch (ExecutionException e) {
                         writer.fail(e);
                     }
                 },
                 throwable -> {
-                    List<Buffer> buffers = dataFuture.get();
-                    if (buffers == null || buffers.isEmpty()) {
-                        return;
+                    try {
+                        List<Buffer> buffers = dataFuture.get();
+                        if (buffers == null || buffers.isEmpty()) {
+                            return;
+                        }
+                        CloseableIterator<Buffer> iterator =
+                                CloseableIterator.fromList(buffers, 
Buffer::recycleBuffer);
+                        iterator.close();
+                    } catch (ExecutionException ignored) {

Review Comment:
   Why do we need those changes? Also why are they in the commit that's adding 
unit tests?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.checkpointing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+import org.apache.flink.util.clock.Clock;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiFunction;
+
+/** Utility for barrier alignment. */
+@Internal
+public class BarrierAlignmentUtil {
+
+    public static long getTimerDelay(Clock clock, CheckpointBarrier 
announcedBarrier) {
+        long alignedCheckpointTimeout =
+                
announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout();
+        long timePassedSinceCheckpointStart =
+                clock.absoluteTimeMillis() - announcedBarrier.getTimestamp();
+
+        return Math.max(alignedCheckpointTimeout - 
timePassedSinceCheckpointStart, 0);
+    }
+
+    public static BiFunction<Callable<?>, Duration, Cancellable> 
createRegisterTimerCallback(

Review Comment:
   As you are making this more public, I would suggest to:
   
   1.  replace `BiFunction` with a named interface
   2. add some comment explaining what is this function suppose to do



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java:
##########
@@ -347,6 +357,129 @@ public void testProducerFailedException() {
         assertTrue(view.getFailureCause() instanceof CancelTaskException);
     }
 
+    @Test
+    public void testConsumeTimeoutableCheckpointBarrierQuickly() throws 
Exception {
+        PipelinedSubpartition subpartition = createSubpartition();
+        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
+        assertEquals(0, subpartition.getChannelStateFutures().size());
+        assertEquals(0, subpartition.getNumberOfQueuedBuffers());
+
+        // test without data buffer
+        long checkpointId = 5L;
+        subpartition.add(getTimeoutableBarrierBuffer(checkpointId));
+        assertEquals(1, subpartition.getNumberOfQueuedBuffers());
+        assertEquals(1, subpartition.getChannelStateFutures().size());
+        CompletableFuture<List<Buffer>> channelStateFuture =
+                subpartition.getChannelStateFutures().get(checkpointId);
+        assertFalse(channelStateFuture.isDone());
+
+        ResultSubpartition.BufferAndBacklog barrierBuffer = 
subpartition.pollBuffer();
+        assertNotNull(barrierBuffer);
+        assertEquals(
+                Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER,
+                barrierBuffer.buffer().getDataType());
+
+        assertEquals(0, subpartition.getChannelStateFutures().size());
+        assertEquals(0, subpartition.getNumberOfQueuedBuffers());
+        assertNull(channelStateFuture.get());
+
+        // test with data buffer
+        subpartition.resumeConsumption();
+        checkpointId = 6;
+        subpartition.add(createFilledFinishedBufferConsumer(4096));
+        subpartition.add(getTimeoutableBarrierBuffer(checkpointId));
+        assertEquals(1, subpartition.getChannelStateFutures().size());
+        assertEquals(2, subpartition.getNumberOfQueuedBuffers());
+        channelStateFuture = 
subpartition.getChannelStateFutures().get(checkpointId);
+
+        assertNotNull(subpartition.pollBuffer());
+        assertFalse(channelStateFuture.isDone());
+        barrierBuffer = subpartition.pollBuffer();
+        assertNotNull(barrierBuffer);
+        assertEquals(
+                Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER,
+                barrierBuffer.buffer().getDataType());
+
+        assertEquals(0, subpartition.getChannelStateFutures().size());
+        assertEquals(0, subpartition.getNumberOfQueuedBuffers());
+        assertNull(channelStateFuture.get());
+    }

Review Comment:
   Maybe write a helper method `testWithNDataBuffers(int n, ...)`; and then 
call it twice:
   ```
   testWithNDataBuffers(0, ...);
   testWithNDataBuffers(1, ...);
   ```
   to deduplicate the code?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -335,6 +379,49 @@ public void checkpointState(
         }
     }
 
+    private void registerAlignmentTimer(
+            long checkpointId,
+            OperatorChain<?, ?> operatorChain,
+            CheckpointBarrier checkpointBarrier) {
+        synchronized (lock) {
+            resetAlignmentTimer(checkpointId);
+            if (!checkpointBarrier.getCheckpointOptions().isTimeoutable()) {
+                return;
+            }
+
+            long timerDelay = BarrierAlignmentUtil.getTimerDelay(clock, 
checkpointBarrier);
+
+            Cancellable currentAlignmentTimer =
+                    registerTimer.apply(
+                            () -> {
+                                try {
+                                    
operatorChain.alignedBarrierTimeout(checkpointId);
+                                } catch (Exception e) {
+                                    ExceptionUtils.rethrowIOException(e);
+                                }
+                                synchronized (lock) {
+                                    alignmentTimers.remove(checkpointId);
+                                }
+                                return null;
+                            },
+                            Duration.ofMillis(timerDelay));

Review Comment:
   note that this mailbox action/callback will be executed in the task thread, 
the same thread as the one calling for example `checkpointState()`. So there 
should be no need for acquiring `lock` anywhere here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to