AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r476683415
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -92,306 +90,173 @@ super(toNotifyOnCheckpoint); this.taskName = taskName; - hasInflightBuffers = Arrays.stream(inputGates) + this.inputGates = inputGates; + storeNewBuffers = Arrays.stream(inputGates) .flatMap(gate -> gate.getChannelInfos().stream()) .collect(Collectors.toMap(Function.identity(), info -> false)); - threadSafeUnaligner = new ThreadSafeUnaligner(checkNotNull(checkpointCoordinator), this, inputGates); + numOpenChannels = storeNewBuffers.size(); + this.checkpointCoordinator = checkpointCoordinator; } - /** - * We still need to trigger checkpoint via {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)} - * while reading the first barrier from one channel, because this might happen - * earlier than the previous async trigger via mailbox by netty thread. - * - * <p>Note this is also suitable for the trigger case of local input channel. - */ @Override - public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException { - long barrierId = receivedBarrier.getId(); - if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) { + public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException { + long barrierId = barrier.getId(); + if (currentCheckpointId > barrierId || (currentCheckpointId == barrierId && !isCheckpointPending())) { // ignore old and cancelled barriers return; } - if (currentConsumedCheckpointId < barrierId) { - currentConsumedCheckpointId = barrierId; - numBarrierConsumed = 0; - hasInflightBuffers.entrySet().forEach(hasInflightBuffer -> hasInflightBuffer.setValue(true)); + if (currentCheckpointId < barrierId) { + handleNewCheckpoint(barrier); + notifyCheckpoint(barrier, 0); } - if (currentConsumedCheckpointId == barrierId) { - hasInflightBuffers.put(channelInfo, false); - numBarrierConsumed++; + if (currentCheckpointId == barrierId) { + if (storeNewBuffers.put(channelInfo, false)) { + LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId); + + inputGates[channelInfo.getGateIdx()].getChannel(channelInfo.getInputChannelIdx()) + .spillInflightBuffers(barrierId, checkpointCoordinator.getChannelStateWriter()); + + if (++numBarriersReceived == numOpenChannels) { + allBarriersReceivedFuture.complete(null); + } + } } - threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfo); } @Override public void abortPendingCheckpoint(long checkpointId, CheckpointException exception) throws IOException { - threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId, exception); + tryAbortPendingCheckpoint(checkpointId, exception); - if (checkpointId > currentConsumedCheckpointId) { - resetPendingCheckpoint(checkpointId); + if (checkpointId > currentCheckpointId) { + resetPendingCheckpoint(); } } @Override public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException { final long cancelledId = cancelBarrier.getCheckpointId(); - boolean shouldAbort = threadSafeUnaligner.setCancelledCheckpointId(cancelledId); + boolean shouldAbort = setCancelledCheckpointId(cancelledId); if (shouldAbort) { notifyAbort( cancelledId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); } - if (cancelledId >= currentConsumedCheckpointId) { - resetPendingCheckpoint(cancelledId); - currentConsumedCheckpointId = cancelledId; + if (cancelledId >= currentCheckpointId) { + resetPendingCheckpoint(); + currentCheckpointId = cancelledId; } } @Override public void processEndOfPartition() throws IOException { - threadSafeUnaligner.onChannelClosed(); - resetPendingCheckpoint(-1L); + numOpenChannels--; + + resetPendingCheckpoint(); + notifyAbort( + currentCheckpointId, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); } - private void resetPendingCheckpoint(long checkpointId) { - if (isCheckpointPending()) { - LOG.warn("{}: Received barrier or EndOfPartition(-1) {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", - taskName, - checkpointId, - currentConsumedCheckpointId); + private void resetPendingCheckpoint() { + LOG.warn("{}: Received barrier or EndOfPartition(-1) before completing current checkpoint {}. " + + "Skipping current checkpoint.", + taskName, + currentCheckpointId); - hasInflightBuffers.entrySet().forEach(hasInflightBuffer -> hasInflightBuffer.setValue(false)); - numBarrierConsumed = 0; - } + storeNewBuffers.entrySet().forEach(storeNewBuffer -> storeNewBuffer.setValue(false)); + numBarriersReceived = 0; } @Override public long getLatestCheckpointId() { - return currentConsumedCheckpointId; + return currentCheckpointId; } @Override public String toString() { - return String.format("%s: last checkpoint: %d", taskName, currentConsumedCheckpointId); + return String.format("%s: last checkpoint: %d", taskName, currentCheckpointId); } @Override public void close() throws IOException { super.close(); - threadSafeUnaligner.close(); - } - - @Override - public boolean hasInflightData(long checkpointId, InputChannelInfo channelInfo) { - if (checkpointId < currentConsumedCheckpointId) { - return false; - } - if (checkpointId > currentConsumedCheckpointId) { - return true; - } - return hasInflightBuffers.get(channelInfo); - } - - @Override - public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) { - return threadSafeUnaligner.getAllBarriersReceivedFuture(checkpointId); - } - - @Override - public Optional<BufferReceivedListener> getBufferReceivedListener() { - return Optional.of(threadSafeUnaligner); + allBarriersReceivedFuture.cancel(false); } @Override protected boolean isCheckpointPending() { - return numBarrierConsumed > 0; - } - - @VisibleForTesting - int getNumOpenChannels() { - return threadSafeUnaligner.getNumOpenChannels(); - } - - @VisibleForTesting - ThreadSafeUnaligner getThreadSafeUnaligner() { - return threadSafeUnaligner; + return numBarriersReceived > 0; } - private void notifyCheckpoint(CheckpointBarrier barrier) throws IOException { - // ignore the previous triggered checkpoint by netty thread if it was already canceled or aborted before. - if (barrier.getId() >= threadSafeUnaligner.getCurrentCheckpointId()) { - super.notifyCheckpoint(barrier, 0); + @Override + public void processBuffer(Buffer buffer, InputChannelInfo channelInfo) { + if (storeNewBuffers.get(channelInfo)) { + checkpointCoordinator.getChannelStateWriter().addInputData( + currentCheckpointId, + channelInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + ofElement(buffer.retainBuffer(), Buffer::recycleBuffer)); Review comment: Let me sketch the most general data flow: * We have couple of buffers in our input channel. * Checkpoint is triggered by another channel. * * All buffers that are now pulled by `CheckpointedInputGate` from the first channel get persisted by above function. * Then the barrier comes in. * * It overtakes all buffers and is now at the head. * * `CheckpointedInputGate` gets priority notification and polls the barrier. * Upon dispatching, it calls `Unaligner`, which spills additionally all overtaken buffers. * Further buffers are not persisted. All steps marked with * are performed in a different thread (other task thread / netty). For me this is equivalent to the current behavior, but maybe I missed something. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org