pnowojski commented on code in PR #19723: URL: https://github.com/apache/flink/pull/19723#discussion_r875761107
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java: ########## @@ -72,6 +74,51 @@ static ChannelStateWriteRequest write( (writer, buffer) -> writer.writeOutput(info, buffer)); } + static ChannelStateWriteRequest write( + long checkpointId, + ResultSubpartitionInfo info, + CompletableFuture<List<Buffer>> dataFuture) { + return buildFutureWriteRequest( + checkpointId, + "writeOutputFuture", + dataFuture, + (writer, buffer) -> writer.writeOutput(info, buffer)); + } + + static ChannelStateWriteRequest buildFutureWriteRequest( + long checkpointId, + String name, + CompletableFuture<List<Buffer>> dataFuture, + BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) { + return new CheckpointInProgressRequest( + name, + checkpointId, + writer -> { + try { + List<Buffer> buffers = dataFuture.get(); + if (buffers == null || buffers.isEmpty()) { Review Comment: nit: maybe we shouldn't support null values here? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java: ########## @@ -72,6 +74,51 @@ static ChannelStateWriteRequest write( (writer, buffer) -> writer.writeOutput(info, buffer)); } + static ChannelStateWriteRequest write( + long checkpointId, + ResultSubpartitionInfo info, + CompletableFuture<List<Buffer>> dataFuture) { + return buildFutureWriteRequest( + checkpointId, + "writeOutputFuture", + dataFuture, + (writer, buffer) -> writer.writeOutput(info, buffer)); + } + + static ChannelStateWriteRequest buildFutureWriteRequest( + long checkpointId, + String name, + CompletableFuture<List<Buffer>> dataFuture, + BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) { + return new CheckpointInProgressRequest( + name, + checkpointId, + writer -> { + try { + List<Buffer> buffers = dataFuture.get(); + if (buffers == null || buffers.isEmpty()) { + return; + } + for (Buffer buffer : buffers) { + checkBufferIsBuffer(buffer); + bufferConsumer.accept(writer, buffer); + } + } catch (Throwable e) { + writer.fail(e); + } + }, + throwable -> { + List<Buffer> buffers = dataFuture.get(); + if (buffers == null || buffers.isEmpty()) { + return; + } Review Comment: nit: without support for `null`, we could skip this check and just always create `CloseableIterator` even if the list is empty. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial inflightBuffers.toArray(new Buffer[0])); } } - return numPriorityElements == 1 - && !isBlocked; // if subpartition is blocked then downstream doesn't expect any - // notifications + return needNotifyPriorityEvent(); + } + + // It just be called after add priorityEvent. + private boolean needNotifyPriorityEvent() { + assert Thread.holdsLock(buffers); + // if subpartition is blocked then downstream doesn't expect any notifications + return buffers.getNumPriorityElements() == 1 && !isBlocked; + } + + private void receiveTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + checkState( + !channelStateFutures.containsKey(barrier.getId()), + "%s has received the checkpoint barrier %d, it maybe a bug.", + toString(), + barrier.getId()); + + checkChannelStateFutures(barrier.getId()); + CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>(); + channelStateFutures.put(barrier.getId(), dataFuture); + channelStateWriter.addOutputDataFuture( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + dataFuture); + } + + private void checkChannelStateFutures(long currentCheckpointId) { + assert Thread.holdsLock(buffers); + + while (!channelStateFutures.isEmpty()) { + Long checkpointId = channelStateFutures.firstKey(); + if (checkpointId >= currentCheckpointId) { + break; + } + String exceptionMessage = + String.format( + "Received the barrier of checkpointId=%d, complete checkpointId=%d " + + "future by exception due to currently does not support " + + "concurrent unaligned checkpoints.", + currentCheckpointId, checkpointId); + channelStateFutures + .pollFirstEntry() + .getValue() + .completeExceptionally(new IllegalStateException(exceptionMessage)); + LOG.info(exceptionMessage); + } + } + + private void completeTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + if (channelStateFutures.isEmpty()) { + return; + } + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(barrier.getId()); + if (channelStateFuture == null) { + return; + } + channelStateFuture.complete(null); + } + + private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier( + BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer); + checkArgument(barrier != null, "Parse the timeoutable Checkpoint Barrier failed."); + checkState( + barrier.getCheckpointOptions().isTimeoutable() + && Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()); + return barrier; + } + + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER; + synchronized (buffers) { + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(checkpointId); + // The checkpoint barrier has sent to downstream, so nothing to do. + if (channelStateFuture == null) { + return; + } + + // 1. find inflightBuffers and timeout the aligned barrier to unaligned barrier + List<Buffer> inflightBuffers = new ArrayList<>(); + try { + if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, inflightBuffers)) { + prioritySequenceNumber = sequenceNumber; + } + } catch (IOException e) { + channelStateFuture.completeExceptionally( + new IllegalStateException( Review Comment: nit: why `IllegalStateException` and not `e`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial inflightBuffers.toArray(new Buffer[0])); } } - return numPriorityElements == 1 - && !isBlocked; // if subpartition is blocked then downstream doesn't expect any - // notifications + return needNotifyPriorityEvent(); + } + + // It just be called after add priorityEvent. + private boolean needNotifyPriorityEvent() { + assert Thread.holdsLock(buffers); + // if subpartition is blocked then downstream doesn't expect any notifications + return buffers.getNumPriorityElements() == 1 && !isBlocked; + } + + private void receiveTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + checkState( + !channelStateFutures.containsKey(barrier.getId()), + "%s has received the checkpoint barrier %d, it maybe a bug.", + toString(), + barrier.getId()); + + checkChannelStateFutures(barrier.getId()); + CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>(); + channelStateFutures.put(barrier.getId(), dataFuture); + channelStateWriter.addOutputDataFuture( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + dataFuture); + } + + private void checkChannelStateFutures(long currentCheckpointId) { + assert Thread.holdsLock(buffers); + + while (!channelStateFutures.isEmpty()) { + Long checkpointId = channelStateFutures.firstKey(); + if (checkpointId >= currentCheckpointId) { + break; + } + String exceptionMessage = + String.format( + "Received the barrier of checkpointId=%d, complete checkpointId=%d " + + "future by exception due to currently does not support " + + "concurrent unaligned checkpoints.", + currentCheckpointId, checkpointId); + channelStateFutures + .pollFirstEntry() + .getValue() + .completeExceptionally(new IllegalStateException(exceptionMessage)); + LOG.info(exceptionMessage); + } + } + + private void completeTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + if (channelStateFutures.isEmpty()) { + return; + } + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(barrier.getId()); + if (channelStateFuture == null) { + return; + } + channelStateFuture.complete(null); + } + + private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier( + BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer); + checkArgument(barrier != null, "Parse the timeoutable Checkpoint Barrier failed."); + checkState( + barrier.getCheckpointOptions().isTimeoutable() + && Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()); + return barrier; + } + + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER; + synchronized (buffers) { + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(checkpointId); + // The checkpoint barrier has sent to downstream, so nothing to do. + if (channelStateFuture == null) { + return; + } + + // 1. find inflightBuffers and timeout the aligned barrier to unaligned barrier + List<Buffer> inflightBuffers = new ArrayList<>(); + try { + if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, inflightBuffers)) { + prioritySequenceNumber = sequenceNumber; + } + } catch (IOException e) { + channelStateFuture.completeExceptionally( + new IllegalStateException( + "Timeout the aligned barrier to unaligned barrier failed.", e)); + LOG.info("Timeout the aligned barrier to unaligned barrier failed.", e); Review Comment: nit: do we need this log message? It doesn't add any new information, right? This exception should be caught and logged higher in the call stack anyway. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java: ########## @@ -347,6 +357,129 @@ public void testProducerFailedException() { assertTrue(view.getFailureCause() instanceof CancelTaskException); } + @Test + public void testConsumeTimeoutableCheckpointBarrierQuickly() throws Exception { + PipelinedSubpartition subpartition = createSubpartition(); + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + assertEquals(0, subpartition.getChannelStateFutures().size()); + assertEquals(0, subpartition.getNumberOfQueuedBuffers()); + + // test without data buffer + long checkpointId = 5L; + subpartition.add(getTimeoutableBarrierBuffer(checkpointId)); + assertEquals(1, subpartition.getNumberOfQueuedBuffers()); + assertEquals(1, subpartition.getChannelStateFutures().size()); + CompletableFuture<List<Buffer>> channelStateFuture = + subpartition.getChannelStateFutures().get(checkpointId); + assertFalse(channelStateFuture.isDone()); + + ResultSubpartition.BufferAndBacklog barrierBuffer = subpartition.pollBuffer(); + assertNotNull(barrierBuffer); + assertEquals( + Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER, + barrierBuffer.buffer().getDataType()); + + assertEquals(0, subpartition.getChannelStateFutures().size()); + assertEquals(0, subpartition.getNumberOfQueuedBuffers()); + assertNull(channelStateFuture.get()); + + // test with data buffer + subpartition.resumeConsumption(); + checkpointId = 6; + subpartition.add(createFilledFinishedBufferConsumer(4096)); + subpartition.add(getTimeoutableBarrierBuffer(checkpointId)); + assertEquals(1, subpartition.getChannelStateFutures().size()); + assertEquals(2, subpartition.getNumberOfQueuedBuffers()); + channelStateFuture = subpartition.getChannelStateFutures().get(checkpointId); + + assertNotNull(subpartition.pollBuffer()); + assertFalse(channelStateFuture.isDone()); + barrierBuffer = subpartition.pollBuffer(); + assertNotNull(barrierBuffer); + assertEquals( + Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER, + barrierBuffer.buffer().getDataType()); + + assertEquals(0, subpartition.getChannelStateFutures().size()); + assertEquals(0, subpartition.getNumberOfQueuedBuffers()); + assertNull(channelStateFuture.get()); + } + + @Test + public void testTimeoutAlignedToUnalignedBarrier() throws Exception { + PipelinedSubpartition subpartition = createSubpartition(); + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + assertEquals(0, subpartition.getChannelStateFutures().size()); + assertEquals(0, subpartition.getNumberOfQueuedBuffers()); + + // test without data buffer + long checkpointId = 7L; + subpartition.add(getTimeoutableBarrierBuffer(checkpointId)); + assertEquals(1, subpartition.getChannelStateFutures().size()); + assertEquals(1, subpartition.getNumberOfQueuedBuffers()); + CompletableFuture<List<Buffer>> channelStateFuture = + subpartition.getChannelStateFutures().get(checkpointId); + assertFalse(channelStateFuture.isDone()); Review Comment: I see those three assertions repeated over and over again. Maybe extract them to a helper method `assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, 1, 1, false);`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -205,6 +217,9 @@ private boolean addBuffer(BufferConsumer bufferConsumer, int partialRecordLength assert Thread.holdsLock(buffers); if (bufferConsumer.getDataType().hasPriority()) { return processPriorityBuffer(bufferConsumer, partialRecordLength); + } else if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()) { + receiveTimeoutableCheckpointBarrier(bufferConsumer); Review Comment: nit: rename to `processTimeoutableCheckpointBarrier()` for the sake of consistency with `processPriorityBuffer`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -191,9 +205,7 @@ private int add(BufferConsumer bufferConsumer, int partialRecordLength, boolean newBufferSize = bufferSize; } - if (prioritySequenceNumber != -1) { - notifyPriorityEvent(prioritySequenceNumber); - } + notifyPriorityEvent(prioritySequenceNumber); Review Comment: very nitty nit: for the future it would be great if you could extract such preliminary refactoring changes (like moving this if check to `notifyPriorityEvent()` or creating `needNotifyPriorityEvent()` to another commit. Such changes are super easy to review on it's own, while if they are bundled in a larger commit adding some complex feature they start to create a noticeable noise. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial inflightBuffers.toArray(new Buffer[0])); } } - return numPriorityElements == 1 - && !isBlocked; // if subpartition is blocked then downstream doesn't expect any - // notifications + return needNotifyPriorityEvent(); + } + + // It just be called after add priorityEvent. + private boolean needNotifyPriorityEvent() { + assert Thread.holdsLock(buffers); + // if subpartition is blocked then downstream doesn't expect any notifications + return buffers.getNumPriorityElements() == 1 && !isBlocked; + } + + private void receiveTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + checkState( + !channelStateFutures.containsKey(barrier.getId()), + "%s has received the checkpoint barrier %d, it maybe a bug.", + toString(), + barrier.getId()); + + checkChannelStateFutures(barrier.getId()); + CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>(); + channelStateFutures.put(barrier.getId(), dataFuture); + channelStateWriter.addOutputDataFuture( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + dataFuture); + } + + private void checkChannelStateFutures(long currentCheckpointId) { + assert Thread.holdsLock(buffers); + + while (!channelStateFutures.isEmpty()) { + Long checkpointId = channelStateFutures.firstKey(); + if (checkpointId >= currentCheckpointId) { + break; + } + String exceptionMessage = + String.format( + "Received the barrier of checkpointId=%d, complete checkpointId=%d " + + "future by exception due to currently does not support " + + "concurrent unaligned checkpoints.", + currentCheckpointId, checkpointId); + channelStateFutures + .pollFirstEntry() + .getValue() + .completeExceptionally(new IllegalStateException(exceptionMessage)); + LOG.info(exceptionMessage); + } + } Review Comment: Why do we need `channelStateFutures` to be a `TreeMap` if we support only one concurrent checkpoint? Couldn't we drop it altogether and have only single `CompletableFuture<...> channelStateFuture` for the currently ongoing checkpoint and simplify this code a bit? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java: ########## @@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial inflightBuffers.toArray(new Buffer[0])); } } - return numPriorityElements == 1 - && !isBlocked; // if subpartition is blocked then downstream doesn't expect any - // notifications + return needNotifyPriorityEvent(); + } + + // It just be called after add priorityEvent. + private boolean needNotifyPriorityEvent() { + assert Thread.holdsLock(buffers); + // if subpartition is blocked then downstream doesn't expect any notifications + return buffers.getNumPriorityElements() == 1 && !isBlocked; + } + + private void receiveTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + checkState( + !channelStateFutures.containsKey(barrier.getId()), + "%s has received the checkpoint barrier %d, it maybe a bug.", + toString(), + barrier.getId()); + + checkChannelStateFutures(barrier.getId()); + CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>(); + channelStateFutures.put(barrier.getId(), dataFuture); + channelStateWriter.addOutputDataFuture( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + dataFuture); + } + + private void checkChannelStateFutures(long currentCheckpointId) { + assert Thread.holdsLock(buffers); + + while (!channelStateFutures.isEmpty()) { + Long checkpointId = channelStateFutures.firstKey(); + if (checkpointId >= currentCheckpointId) { + break; + } + String exceptionMessage = + String.format( + "Received the barrier of checkpointId=%d, complete checkpointId=%d " + + "future by exception due to currently does not support " + + "concurrent unaligned checkpoints.", + currentCheckpointId, checkpointId); + channelStateFutures + .pollFirstEntry() + .getValue() + .completeExceptionally(new IllegalStateException(exceptionMessage)); + LOG.info(exceptionMessage); + } + } + + private void completeTimeoutableCheckpointBarrier(BufferConsumer bufferConsumer) { + if (channelStateFutures.isEmpty()) { + return; + } + CheckpointBarrier barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(barrier.getId()); + if (channelStateFuture == null) { + return; + } + channelStateFuture.complete(null); + } + + private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier( + BufferConsumer bufferConsumer) { + CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer); + checkArgument(barrier != null, "Parse the timeoutable Checkpoint Barrier failed."); + checkState( + barrier.getCheckpointOptions().isTimeoutable() + && Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()); + return barrier; + } + + @Override + public void alignedBarrierTimeout(long checkpointId) throws IOException { + int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER; + synchronized (buffers) { + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(checkpointId); + // The checkpoint barrier has sent to downstream, so nothing to do. + if (channelStateFuture == null) { + return; + } + + // 1. find inflightBuffers and timeout the aligned barrier to unaligned barrier + List<Buffer> inflightBuffers = new ArrayList<>(); + try { + if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, inflightBuffers)) { + prioritySequenceNumber = sequenceNumber; + } + } catch (IOException e) { + channelStateFuture.completeExceptionally( + new IllegalStateException( + "Timeout the aligned barrier to unaligned barrier failed.", e)); + LOG.info("Timeout the aligned barrier to unaligned barrier failed.", e); + throw e; + } + + // 2. complete the channelStateFuture + channelStateFuture.complete(inflightBuffers); + } + + // 3. notify downstream read barrier, it must be called outside the buffers_lock to avoid + // the deadlock. + notifyPriorityEvent(prioritySequenceNumber); + } + + @Override + public void abortCheckpoint(long checkpointId, CheckpointException cause) { + synchronized (buffers) { + CompletableFuture<List<Buffer>> channelStateFuture = + channelStateFutures.remove(checkpointId); + // The channelStateFuture has been completed, so nothing to do. + if (channelStateFuture == null) { + return; + } + channelStateFuture.completeExceptionally(cause); + } + } + + private boolean findInflightBuffersAndMakeBarrierToPriority( + long checkpointId, List<Buffer> inflightBuffers) throws IOException { + // 1. record the buffers before barrier as inflightBuffers + final int numPriorityElements = buffers.getNumPriorityElements(); + final Iterator<BufferConsumerWithPartialRecordLength> iterator = buffers.iterator(); + Iterators.advance(iterator, numPriorityElements); + + BufferConsumerWithPartialRecordLength element = null; + CheckpointBarrier barrier = null; + while (iterator.hasNext()) { + BufferConsumerWithPartialRecordLength next = iterator.next(); + BufferConsumer bufferConsumer = next.getBufferConsumer(); + + if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER + == bufferConsumer.getDataType()) { + barrier = parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer); + if (barrier.getId() != checkpointId) { + continue; + } Review Comment: shouldn't this be actually a `checkState`? Can this happen? I guess maybe for a checkpoint that has been aborted previously? If so, maybe add a comment that why we can encounter older checkpoints? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java: ########## @@ -103,6 +112,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @GuardedBy("lock") private boolean closed; + private final BiFunction<Callable<?>, Duration, Cancellable> registerTimer; + + private final Clock clock; + + /** Hold the AlignmentTimer for each checkpointId. */ + @GuardedBy("lock") + private final NavigableMap<Long, Cancellable> alignmentTimers; Review Comment: Again, do we need a map here since we support only single concurrent checkpoint? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java: ########## @@ -294,23 +324,37 @@ public void checkpointState( options = options.withUnalignedSupported(); initInputsCheckpoint(metadata.getCheckpointId(), options); } + if (options.isTimeoutable()) { + // The output buffer may need to be snapshotted, so start the channelStateWriter here. + channelStateWriter.start(metadata.getCheckpointId(), options); + channelStateWriter.finishInput(metadata.getCheckpointId()); + } Review Comment: Maybe refactor this code a bit. Extract this if check and one above to a separate method and inline `initInputsCheckpoint`? So that we always start `channelStateWriter` in one place? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java: ########## @@ -103,6 +112,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @GuardedBy("lock") private boolean closed; + private final BiFunction<Callable<?>, Duration, Cancellable> registerTimer; + + private final Clock clock; + + /** Hold the AlignmentTimer for each checkpointId. */ + @GuardedBy("lock") Review Comment: why this need to be guarded by `lock`? AFAIK it exists only because of `AsyncCheckpointRunnable` calling `SubtaskCheckpointCoordinatorImpl#unregisterConsumer`. All other operations are/should be from the task thread. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java: ########## @@ -103,18 +104,21 @@ static ChannelStateWriteRequest buildFutureWriteRequest( checkBufferIsBuffer(buffer); bufferConsumer.accept(writer, buffer); } - } catch (Throwable e) { + } catch (ExecutionException e) { writer.fail(e); } }, throwable -> { - List<Buffer> buffers = dataFuture.get(); - if (buffers == null || buffers.isEmpty()) { - return; + try { + List<Buffer> buffers = dataFuture.get(); + if (buffers == null || buffers.isEmpty()) { + return; + } + CloseableIterator<Buffer> iterator = + CloseableIterator.fromList(buffers, Buffer::recycleBuffer); + iterator.close(); + } catch (ExecutionException ignored) { Review Comment: Why do we need those changes? Also why are they in the commit that's adding unit tests? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.checkpointing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.streaming.runtime.tasks.TimerService; +import org.apache.flink.util.clock.Clock; + +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; + +/** Utility for barrier alignment. */ +@Internal +public class BarrierAlignmentUtil { + + public static long getTimerDelay(Clock clock, CheckpointBarrier announcedBarrier) { + long alignedCheckpointTimeout = + announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout(); + long timePassedSinceCheckpointStart = + clock.absoluteTimeMillis() - announcedBarrier.getTimestamp(); + + return Math.max(alignedCheckpointTimeout - timePassedSinceCheckpointStart, 0); + } + + public static BiFunction<Callable<?>, Duration, Cancellable> createRegisterTimerCallback( Review Comment: As you are making this more public, I would suggest to: 1. replace `BiFunction` with a named interface 2. add some comment explaining what is this function suppose to do ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java: ########## @@ -347,6 +357,129 @@ public void testProducerFailedException() { assertTrue(view.getFailureCause() instanceof CancelTaskException); } + @Test + public void testConsumeTimeoutableCheckpointBarrierQuickly() throws Exception { + PipelinedSubpartition subpartition = createSubpartition(); + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + assertEquals(0, subpartition.getChannelStateFutures().size()); + assertEquals(0, subpartition.getNumberOfQueuedBuffers()); + + // test without data buffer + long checkpointId = 5L; + subpartition.add(getTimeoutableBarrierBuffer(checkpointId)); + assertEquals(1, subpartition.getNumberOfQueuedBuffers()); + assertEquals(1, subpartition.getChannelStateFutures().size()); + CompletableFuture<List<Buffer>> channelStateFuture = + subpartition.getChannelStateFutures().get(checkpointId); + assertFalse(channelStateFuture.isDone()); + + ResultSubpartition.BufferAndBacklog barrierBuffer = subpartition.pollBuffer(); + assertNotNull(barrierBuffer); + assertEquals( + Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER, + barrierBuffer.buffer().getDataType()); + + assertEquals(0, subpartition.getChannelStateFutures().size()); + assertEquals(0, subpartition.getNumberOfQueuedBuffers()); + assertNull(channelStateFuture.get()); + + // test with data buffer + subpartition.resumeConsumption(); + checkpointId = 6; + subpartition.add(createFilledFinishedBufferConsumer(4096)); + subpartition.add(getTimeoutableBarrierBuffer(checkpointId)); + assertEquals(1, subpartition.getChannelStateFutures().size()); + assertEquals(2, subpartition.getNumberOfQueuedBuffers()); + channelStateFuture = subpartition.getChannelStateFutures().get(checkpointId); + + assertNotNull(subpartition.pollBuffer()); + assertFalse(channelStateFuture.isDone()); + barrierBuffer = subpartition.pollBuffer(); + assertNotNull(barrierBuffer); + assertEquals( + Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER, + barrierBuffer.buffer().getDataType()); + + assertEquals(0, subpartition.getChannelStateFutures().size()); + assertEquals(0, subpartition.getNumberOfQueuedBuffers()); + assertNull(channelStateFuture.get()); + } Review Comment: Maybe write a helper method `testWithNDataBuffers(int n, ...)`; and then call it twice: ``` testWithNDataBuffers(0, ...); testWithNDataBuffers(1, ...); ``` to deduplicate the code? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java: ########## @@ -335,6 +379,49 @@ public void checkpointState( } } + private void registerAlignmentTimer( + long checkpointId, + OperatorChain<?, ?> operatorChain, + CheckpointBarrier checkpointBarrier) { + synchronized (lock) { + resetAlignmentTimer(checkpointId); + if (!checkpointBarrier.getCheckpointOptions().isTimeoutable()) { + return; + } + + long timerDelay = BarrierAlignmentUtil.getTimerDelay(clock, checkpointBarrier); + + Cancellable currentAlignmentTimer = + registerTimer.apply( + () -> { + try { + operatorChain.alignedBarrierTimeout(checkpointId); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + synchronized (lock) { + alignmentTimers.remove(checkpointId); + } + return null; + }, + Duration.ofMillis(timerDelay)); Review Comment: note that this mailbox action/callback will be executed in the task thread, the same thread as the one calling for example `checkpointState()`. So there should be no need for acquiring `lock` anywhere here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org