AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476683415



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -92,306 +90,173 @@
                super(toNotifyOnCheckpoint);
 
                this.taskName = taskName;
-               hasInflightBuffers = Arrays.stream(inputGates)
+               this.inputGates = inputGates;
+               storeNewBuffers = Arrays.stream(inputGates)
                        .flatMap(gate -> gate.getChannelInfos().stream())
                        .collect(Collectors.toMap(Function.identity(), info -> 
false));
-               threadSafeUnaligner = new 
ThreadSafeUnaligner(checkNotNull(checkpointCoordinator), this, inputGates);
+               numOpenChannels = storeNewBuffers.size();
+               this.checkpointCoordinator = checkpointCoordinator;
        }
 
-       /**
-        * We still need to trigger checkpoint via {@link 
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
-        * while reading the first barrier from one channel, because this might 
happen
-        * earlier than the previous async trigger via mailbox by netty thread.
-        *
-        * <p>Note this is also suitable for the trigger case of local input 
channel.
-        */
        @Override
-       public void processBarrier(CheckpointBarrier receivedBarrier, 
InputChannelInfo channelInfo) throws IOException {
-               long barrierId = receivedBarrier.getId();
-               if (currentConsumedCheckpointId > barrierId || 
(currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
+       public void processBarrier(CheckpointBarrier barrier, InputChannelInfo 
channelInfo) throws IOException {
+               long barrierId = barrier.getId();
+               if (currentCheckpointId > barrierId || (currentCheckpointId == 
barrierId && !isCheckpointPending())) {
                        // ignore old and cancelled barriers
                        return;
                }
-               if (currentConsumedCheckpointId < barrierId) {
-                       currentConsumedCheckpointId = barrierId;
-                       numBarrierConsumed = 0;
-                       hasInflightBuffers.entrySet().forEach(hasInflightBuffer 
-> hasInflightBuffer.setValue(true));
+               if (currentCheckpointId < barrierId) {
+                       handleNewCheckpoint(barrier);
+                       notifyCheckpoint(barrier, 0);
                }
-               if (currentConsumedCheckpointId == barrierId) {
-                       hasInflightBuffers.put(channelInfo, false);
-                       numBarrierConsumed++;
+               if (currentCheckpointId == barrierId) {
+                       if (storeNewBuffers.put(channelInfo, false)) {
+                               LOG.debug("{}: Received barrier from channel {} 
@ {}.", taskName, channelInfo, barrierId);
+
+                               
inputGates[channelInfo.getGateIdx()].getChannel(channelInfo.getInputChannelIdx())
+                                       .spillInflightBuffers(barrierId, 
checkpointCoordinator.getChannelStateWriter());
+
+                               if (++numBarriersReceived == numOpenChannels) {
+                                       
allBarriersReceivedFuture.complete(null);
+                               }
+                       }
                }
-               threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, 
channelInfo);
        }
 
        @Override
        public void abortPendingCheckpoint(long checkpointId, 
CheckpointException exception) throws IOException {
-               threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId, 
exception);
+               tryAbortPendingCheckpoint(checkpointId, exception);
 
-               if (checkpointId > currentConsumedCheckpointId) {
-                       resetPendingCheckpoint(checkpointId);
+               if (checkpointId > currentCheckpointId) {
+                       resetPendingCheckpoint();
                }
        }
 
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws IOException {
                final long cancelledId = cancelBarrier.getCheckpointId();
-               boolean shouldAbort = 
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
+               boolean shouldAbort = setCancelledCheckpointId(cancelledId);
                if (shouldAbort) {
                        notifyAbort(
                                cancelledId,
                                new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
                }
 
-               if (cancelledId >= currentConsumedCheckpointId) {
-                       resetPendingCheckpoint(cancelledId);
-                       currentConsumedCheckpointId = cancelledId;
+               if (cancelledId >= currentCheckpointId) {
+                       resetPendingCheckpoint();
+                       currentCheckpointId = cancelledId;
                }
        }
 
        @Override
        public void processEndOfPartition() throws IOException {
-               threadSafeUnaligner.onChannelClosed();
-               resetPendingCheckpoint(-1L);
+               numOpenChannels--;
+
+               resetPendingCheckpoint();
+               notifyAbort(
+                       currentCheckpointId,
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
        }
 
-       private void resetPendingCheckpoint(long checkpointId) {
-               if (isCheckpointPending()) {
-                       LOG.warn("{}: Received barrier or EndOfPartition(-1) {} 
before completing current checkpoint {}. " +
-                                       "Skipping current checkpoint.",
-                               taskName,
-                               checkpointId,
-                               currentConsumedCheckpointId);
+       private void resetPendingCheckpoint() {
+               LOG.warn("{}: Received barrier or EndOfPartition(-1) before 
completing current checkpoint {}. " +
+                               "Skipping current checkpoint.",
+                       taskName,
+                       currentCheckpointId);
 
-                       hasInflightBuffers.entrySet().forEach(hasInflightBuffer 
-> hasInflightBuffer.setValue(false));
-                       numBarrierConsumed = 0;
-               }
+               storeNewBuffers.entrySet().forEach(storeNewBuffer -> 
storeNewBuffer.setValue(false));
+               numBarriersReceived = 0;
        }
 
        @Override
        public long getLatestCheckpointId() {
-               return currentConsumedCheckpointId;
+               return currentCheckpointId;
        }
 
        @Override
        public String toString() {
-               return String.format("%s: last checkpoint: %d", taskName, 
currentConsumedCheckpointId);
+               return String.format("%s: last checkpoint: %d", taskName, 
currentCheckpointId);
        }
 
        @Override
        public void close() throws IOException {
                super.close();
-               threadSafeUnaligner.close();
-       }
-
-       @Override
-       public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
-               if (checkpointId < currentConsumedCheckpointId) {
-                       return false;
-               }
-               if (checkpointId > currentConsumedCheckpointId) {
-                       return true;
-               }
-               return hasInflightBuffers.get(channelInfo);
-       }
-
-       @Override
-       public CompletableFuture<Void> getAllBarriersReceivedFuture(long 
checkpointId) {
-               return 
threadSafeUnaligner.getAllBarriersReceivedFuture(checkpointId);
-       }
-
-       @Override
-       public Optional<BufferReceivedListener> getBufferReceivedListener() {
-               return Optional.of(threadSafeUnaligner);
+               allBarriersReceivedFuture.cancel(false);
        }
 
        @Override
        protected boolean isCheckpointPending() {
-               return numBarrierConsumed > 0;
-       }
-
-       @VisibleForTesting
-       int getNumOpenChannels() {
-               return threadSafeUnaligner.getNumOpenChannels();
-       }
-
-       @VisibleForTesting
-       ThreadSafeUnaligner getThreadSafeUnaligner() {
-               return threadSafeUnaligner;
+               return numBarriersReceived > 0;
        }
 
-       private void notifyCheckpoint(CheckpointBarrier barrier) throws 
IOException {
-               // ignore the previous triggered checkpoint by netty thread if 
it was already canceled or aborted before.
-               if (barrier.getId() >= 
threadSafeUnaligner.getCurrentCheckpointId()) {
-                       super.notifyCheckpoint(barrier, 0);
+       @Override
+       public void processBuffer(Buffer buffer, InputChannelInfo channelInfo) {
+               if (storeNewBuffers.get(channelInfo)) {
+                       
checkpointCoordinator.getChannelStateWriter().addInputData(
+                               currentCheckpointId,
+                               channelInfo,
+                               ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                               ofElement(buffer.retainBuffer(), 
Buffer::recycleBuffer));

Review comment:
       Let me sketch the most general data flow:
   * We have couple of buffers in our input channel.
   * Checkpoint is triggered by another channel. *
   * All buffers that are now pulled by `CheckpointedInputGate` from the first 
channel get persisted by above function.
   * Then the barrier comes in. *
   * It overtakes all buffers and is now at the head. *
   * `CheckpointedInputGate` gets priority notification and polls the barrier.
   * Upon dispatching, it calls `Unaligner`, which spills additionally all 
overtaken buffers.
   * Further buffers are not persisted.
   
   All steps marked with * are performed in a different thread (other task 
thread / netty).
   
   For me this is equivalent to the current behavior, but maybe I missed 
something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to