AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492481729



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -261,8 +260,7 @@ void registerSourceReader(ReaderInfo readerInfo) {
         * @param subtaskId the subtask id of the source reader.
         */
        void unregisterSourceReader(int subtaskId) {
-               Preconditions.checkNotNull(registeredReaders.remove(subtaskId), 
String.format(
-                               "Failed to unregister source reader of id %s 
because it is not registered.", subtaskId));
+               registeredReaders.remove(subtaskId);

Review comment:
       Added a ticket and referenced it properly 
https://issues.apache.org/jira/browse/FLINK-19338 .

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -124,23 +127,22 @@ public boolean isAvailable() {
        }
 
        /**
-        * Check whether this reader is available or not (internal use, in sync 
with
-        * {@link #isAvailable()}, but slightly faster).
+        * Returns the {@link 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer 
in line.
         *
-        * <p>Returns true only if the next buffer is an event or the reader 
has both available
+        * <p>Returns the next data type only if the next buffer is an event or 
the reader has both available
         * credits and buffers.
         *
         * @param bufferAndBacklog
         *              current buffer and backlog including information about 
the next buffer
+        * @return the next data type if the next buffer can be pulled 
immediately or null
         */
-       private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+       private Buffer.DataType getNextDataType(BufferAndBacklog 
bufferAndBacklog) {

Review comment:
       I added an implNote in the javadoc.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Objects;
+
+/**
+ * A deque-like data structure that supports prioritization of elements, such 
they will be polled before any
+ * non-priority elements.
+ *
+ * <p>{@implNote The current implementation deliberately does not implement 
the respective interface to minimize the maintenance
+ * effort. Furthermore, it's optimized for handling non-priority elements, 
such that all operations for adding priority
+ * elements are much slower than the non-priority counter-parts.}
+ *
+ * <p>Note that all element tests are performed by identity.
+ *
+ * @param <T> the element type.
+ */
+@Internal
+public final class PrioritizedDeque<T> implements Iterable<T> {
+       private final Deque<T> deque = new ArrayDeque<>();
+       private int numPriorityElements;
+
+       /**
+        * Adds a priority element to this deque, such that it will be polled 
after all existing priority elements but
+        * before any non-priority element.
+        *
+        * @param element the element to add
+        */
+       public void addPriorityElement(T element) {
+               // priority elements are rather rare and short-lived, so most 
of there are none
+               if (numPriorityElements == 0) {
+                       deque.addFirst(element);
+               } else if (numPriorityElements == deque.size()) {
+                       // no non-priority elements
+                       deque.add(element);
+               } else {
+                       // remove all priority elements
+                       final ArrayDeque<T> priorPriority = new 
ArrayDeque<>(numPriorityElements);
+                       for (int index = 0; index < numPriorityElements; 
index++) {
+                               priorPriority.addFirst(deque.poll());
+                       }
+                       deque.addFirst(element);
+                       // readd them before the newly added element
+                       for (final T priorityEvent : priorPriority) {
+                               deque.addFirst(priorityEvent);
+                       }
+               }
+               numPriorityElements++;
+       }
+
+       /**
+        * Adds a non-priority element to this deque, which will be polled last.
+        *
+        * @param element the element to add
+        */
+       public void add(T element) {
+               deque.add(element);
+       }
+
+       /**
+        * Convenience method for adding an element with optional priority and 
prior removal.
+        *
+        * @param element the element to add
+        * @param priority flag indicating if it's a priority or non-priority 
element
+        * @param alreadyContained flag that hints that the element is already 
in this deque, potentially as non-priority element.
+        */
+       public void add(T element, boolean priority, boolean alreadyContained) {

Review comment:
       I'm moving it to the commit that starts using it.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -22,13 +22,10 @@
 import org.apache.flink.annotation.VisibleForTesting;

Review comment:
       I admit that it's an awkward cut. 
   
   However, it's only spilled in one place as the `BufferReceivedListener` 
methods are effectively not called in the previous commits anymore. I will make 
a later pass to see that all tests pass though.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -314,6 +315,21 @@ protected StreamTask(
                }
 
                this.channelIOExecutor = Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory("channel-state-unspilling"));
+
+               injectChannelStateWriterIntoChannels();
+       }
+
+       private void injectChannelStateWriterIntoChannels() {
+               final Environment env = getEnvironment();
+               final ChannelStateWriter channelStateWriter = 
subtaskCheckpointCoordinator.getChannelStateWriter();
+               for (final InputGate gate : env.getAllInputGates()) {
+                       gate.setChannelStateWriter(channelStateWriter);
+               }
+               for (ResultPartitionWriter writer : env.getAllWriters()) {
+                       if (writer instanceof ChannelStateHolder) {
+                               ((ChannelStateHolder) 
writer).setChannelStateWriter(channelStateWriter);
+                       }
+               }

Review comment:
       Happy for any other suggestion. I think the cast itself is in line with 
recent changes done by Stephan.
   
   The proper solution would be to inject in ctor but that will not happen 
until we merge `runtime` and `streaming`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -232,19 +232,16 @@ public int getInputIndex() {
                        ChannelStateWriter channelStateWriter,
                        long checkpointId) throws IOException {
                for (int channelIndex = 0; channelIndex < 
recordDeserializers.length; channelIndex++) {
-                       final InputChannel channel = 
checkpointedInputGate.getChannel(channelIndex);
-
-                       // Assumption for retrieving buffers = one concurrent 
checkpoint
                        RecordDeserializer<?> deserializer = 
recordDeserializers[channelIndex];
                        if (deserializer != null) {
+                               final InputChannel channel = 
checkpointedInputGate.getChannel(channelIndex);
+
                                channelStateWriter.addInputData(
                                        checkpointId,
                                        channel.getChannelInfo(),
                                        
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
                                        deserializer.getUnconsumedBuffer());
                        }
-
-                       
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, 
channelStateWriter);

Review comment:
       We discussed offline, but let me summarize the main point. When spilling 
in main thread, buffers would be spilled rather late as you pointed out 
initially. 
   One solution is to trigger the channel spilling in task thread as soon as 
possible and then resume spilling from netty until barrier arrives. However, 
that results in a complex threading model with lots of race condition as we 
found out in the version in master.
   Another solution is to spill in task thread and use any poll to discover new 
buffers and spill them. It's slightly slower and also requires lots of internal 
knowledge at the `Unaligner` about the channels to work well (mark all spilled 
channels). It's probably also suboptimal as new buffers are usually enqueued 
right after the head is polled, so one buffer is enqueued in the channel but 
not persisted until the new head is polled.
   The proposed solution is to spill in netty thread entirely. That's the 
fastest possible solution with an comparably easy threading model. Downside is 
the added complexity on channel-side, but the general idea is that upstream and 
downstream side of a channel is now self-contained.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, 
boolean finish, boolean inser
 
        private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean 
insertAsHead) {
                assert Thread.holdsLock(buffers);
-               if (insertAsHead) {
-                       checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
-                               "checkpoints");
+               if (!insertAsHead) {
+                       buffers.add(bufferConsumer);
+                       return;
+               }
+               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
+                       "checkpoints");
 
-                       // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-                       for (BufferConsumer buffer : buffers) {
-                               try (BufferConsumer bc = buffer.copy()) {
-                                       if (bc.isBuffer()) {
+               final int pos = buffers.getNumPriorityElements();
+               buffers.addPriorityElement(bufferConsumer);
+
+               boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+               if (unalignedCheckpoint) {
+                       final Iterator<BufferConsumer> iterator = 
buffers.iterator();
+                       Iterators.advance(iterator, pos + 1);
+                       while (iterator.hasNext()) {
+                               BufferConsumer buffer = iterator.next();
+
+                               if (buffer.isBuffer()) {
+                                       try (BufferConsumer bc = buffer.copy()) 
{
                                                
inflightBufferSnapshot.add(bc.build());
                                        }
                                }
                        }
+               }
+       }
 
-                       buffers.addFirst(bufferConsumer);
-               } else {
-                       buffers.add(bufferConsumer);
+       private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) {
+               boolean unalignedCheckpoint;
+               try (BufferConsumer bc = bufferConsumer.copy()) {
+                       Buffer buffer = bc.build();
+                       try {
+                               final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+                               unalignedCheckpoint = event instanceof 
CheckpointBarrier;
+                       } catch (IOException e) {
+                               throw new IllegalStateException("Should always 
be able to deserialize in-memory event", e);
+                       } finally {
+                               buffer.recycleBuffer();
+                       }
                }
+               return unalignedCheckpoint;

Review comment:
       Moved this method to the commit that spills immediately. We need it in 
that method to retrieve the checkpoint id to spill correctly.
   
   Deserialization is only necessary for priority events, which are very rare 
and rather cheap (30 bytes). I'd argue that adding a new call chain just to 
optimize it is not warranted.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, 
boolean finish, boolean inser
 
        private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean 
insertAsHead) {
                assert Thread.holdsLock(buffers);
-               if (insertAsHead) {
-                       checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
-                               "checkpoints");
+               if (!insertAsHead) {
+                       buffers.add(bufferConsumer);
+                       return;
+               }
+               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
+                       "checkpoints");
 
-                       // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.

Review comment:
       Moved the removal to the commit that spills immediately.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) {
                        buffers.add(bufferConsumer);
                        return false;
                }
-               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
-                       "checkpoints");
 
                final int pos = buffers.getNumPriorityElements();
                buffers.addPriorityElement(bufferConsumer);
 
-               boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
-               if (unalignedCheckpoint) {
+               CheckpointBarrier barrier = 
parseCheckpointBarrier(bufferConsumer);
+               if (barrier != null) {
+                       checkState(
+                               
barrier.getCheckpointOptions().isUnalignedCheckpoint(),
+                               "Only unaligned checkpoints should be priority 
events");
                        final Iterator<BufferConsumer> iterator = 
buffers.iterator();
                        Iterators.advance(iterator, pos + 1);
+                       List<Buffer> inflightBuffers = new ArrayList<>();
                        while (iterator.hasNext()) {
                                BufferConsumer buffer = iterator.next();
 
                                if (buffer.isBuffer()) {
                                        try (BufferConsumer bc = buffer.copy()) 
{
-                                               
inflightBufferSnapshot.add(bc.build());
+                                               inflightBuffers.add(bc.build());
                                        }
                                }
                        }
+                       channelStateWriter.addOutputData(
+                               barrier.getId(),
+                               subpartitionInfo,
+                               ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                               inflightBuffers.toArray(new Buffer[0]));
                }
                return pos == 0;
        }
 
-       private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) {
-               boolean unalignedCheckpoint;
+       @Nullable
+       private CheckpointBarrier parseCheckpointBarrier(BufferConsumer 
bufferConsumer) {

Review comment:
       Hm I have a hard time coming up with a better code structure. I could 
add checkpoint parsing and the `if` in `addBuffer` already in the first commit 
`[FLINK-19026][network] Adding PrioritizedDeque and use it in 
PipelinedSubpartition.`. Then this diff would be only about persisting itself.
   
   But I was convinced that you would be confused why we need to parse the 
barrier at that commit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
##########
@@ -89,6 +92,14 @@
         */
        private final int[] inputGateChannelIndexOffsets;
 
+       /**
+        * The channel from which is currently polled, which allows 
interleaving of
+        * {@link #queueInputGate(IndexedInputGate, boolean)} and {@link 
#pollNext()} (FLINK-12510 (Deadlock when reading from InputGates)).
+        */
+       @GuardedBy("inputGatesWithData")
+       @Nullable
+       private IndexedInputGate currentInputGate;
+

Review comment:
       Removed thanks to alternative fix of FLINK-12510 (see previous commit 
now).

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
##########
@@ -369,9 +369,7 @@ public void testMissingCancellationBarriers() throws 
Exception {
                inputGate = createBarrierBuffer(2, sequence, validator);

Review comment:
       > A side-effect of this commit is that all events are handed over from 
CheckpointedInputGate to StreamTaskNetworkInput and break up the poll loop. 
However, since events are rare, it should have no visible impact on the 
throughput.
   
   The changes to the tests are now handling the additionally emitted events. 
Imho tests are easier to read now (no magically disappearing buffers in the 
sequence).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -124,23 +127,22 @@ public boolean isAvailable() {
        }
 
        /**
-        * Check whether this reader is available or not (internal use, in sync 
with
-        * {@link #isAvailable()}, but slightly faster).
+        * Returns the {@link 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer 
in line.
         *
-        * <p>Returns true only if the next buffer is an event or the reader 
has both available
+        * <p>Returns the next data type only if the next buffer is an event or 
the reader has both available
         * credits and buffers.
         *
         * @param bufferAndBacklog
         *              current buffer and backlog including information about 
the next buffer
+        * @return the next data type if the next buffer can be pulled 
immediately or null
         */
-       private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+       private Buffer.DataType getNextDataType(BufferAndBacklog 
bufferAndBacklog) {
                // BEWARE: this must be in sync with #isAvailable()!

Review comment:
       I expanded the comment (now javadoc) to clearly state the contract.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -779,34 +820,56 @@ void triggerPartitionStateCheck(ResultPartitionID 
partitionId) {
                        }));
        }
 
-       private void queueChannel(InputChannel channel) {
-               int availableChannels;
+       private void queueChannel(InputChannel channel, boolean priority) {
 
+               CompletableFuture<?> toNotifyPriority = null;

Review comment:
       Extracted into `GateNotificationHelper`, please check if it's actually 
helping to reduce complexity.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
##########
@@ -34,11 +30,8 @@
        void notifyDataAvailable();
 
        /**
-        * Allows the listener to react to a priority event before it is added 
to the outgoing buffer queue.
-        *
-        * @return true if the event has been fully processed and should not be 
added to the buffer queue.
+        * Called when the first priority event is added to the head of the 
buffer queue.
         */
-       default boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) 
throws IOException {
-               return false;
+       default void notifyPriorityEvent() {

Review comment:
       Added a few thoughts. Let me know if it makes things clearer.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -147,6 +148,14 @@ else if (configuredInput instanceof SourceInputConfig) {
                return anyInputAvailable;
        }
 
+       @Override
+       public CompletableFuture<?> getPriorityEventAvailableFuture() {
+               return CompletableFuture.anyOf(
+                       Arrays.stream(inputProcessors)
+                               .map(inputProcessor -> 
inputProcessor.taskInput.getPriorityEventAvailableFuture())
+                               .toArray(CompletableFuture[]::new));

Review comment:
       Commit is removed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) {
                        buffers.add(bufferConsumer);
                        return false;
                }
-               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
-                       "checkpoints");
 
                final int pos = buffers.getNumPriorityElements();
                buffers.addPriorityElement(bufferConsumer);
 
-               boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
-               if (unalignedCheckpoint) {
+               CheckpointBarrier barrier = 
parseCheckpointBarrier(bufferConsumer);
+               if (barrier != null) {
+                       checkState(
+                               
barrier.getCheckpointOptions().isUnalignedCheckpoint(),
+                               "Only unaligned checkpoints should be priority 
events");
                        final Iterator<BufferConsumer> iterator = 
buffers.iterator();
                        Iterators.advance(iterator, pos + 1);
+                       List<Buffer> inflightBuffers = new ArrayList<>();
                        while (iterator.hasNext()) {
                                BufferConsumer buffer = iterator.next();
 
                                if (buffer.isBuffer()) {
                                        try (BufferConsumer bc = buffer.copy()) 
{
-                                               
inflightBufferSnapshot.add(bc.build());
+                                               inflightBuffers.add(bc.build());
                                        }
                                }
                        }
+                       channelStateWriter.addOutputData(
+                               barrier.getId(),
+                               subpartitionInfo,
+                               ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                               inflightBuffers.toArray(new Buffer[0]));

Review comment:
       Not relevant in the final version where the channel spills by itself (no 
return value on this method). I can make it clearer in the commit message if 
you want.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -210,15 +221,25 @@ public void spillInflightBuffers(long checkpointId, 
ChannelStateWriter channelSt
                }
 
                Buffer buffer = next.buffer();
-               CheckpointBarrier notifyReceivedBarrier = 
parseCheckpointBarrierOrNull(buffer);
-               if (notifyReceivedBarrier != null) {
-                       receivedCheckpointId = notifyReceivedBarrier.getId();
-               } else if (receivedCheckpointId < lastRequestedCheckpointId && 
buffer.isBuffer()) {
-                       
inputGate.getBufferReceivedListener().notifyBufferReceived(buffer.retainBuffer(),
 channelInfo);
-               }
 
                numBytesIn.inc(buffer.getSize());
                numBuffersIn.inc();
+               if (buffer.isBuffer()) {
+                       for (final long barrierId : pendingCheckpointBarriers) {
+                               channelStateWriter.addInputData(
+                                       barrierId,
+                                       getChannelInfo(),
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));

Review comment:
       `LocalInputChannel` only spills when it awaits barrier. So it spills the 
buffer on first sight and it cannot be better on downsteam level. We could of 
course also move spilling lingering buffers to the upstream. It might also be 
an improvement for later.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##########
@@ -210,15 +221,25 @@ public void spillInflightBuffers(long checkpointId, 
ChannelStateWriter channelSt
                }
 
                Buffer buffer = next.buffer();
-               CheckpointBarrier notifyReceivedBarrier = 
parseCheckpointBarrierOrNull(buffer);
-               if (notifyReceivedBarrier != null) {
-                       receivedCheckpointId = notifyReceivedBarrier.getId();
-               } else if (receivedCheckpointId < lastRequestedCheckpointId && 
buffer.isBuffer()) {
-                       
inputGate.getBufferReceivedListener().notifyBufferReceived(buffer.retainBuffer(),
 channelInfo);
-               }
 
                numBytesIn.inc(buffer.getSize());
                numBuffersIn.inc();
+               if (buffer.isBuffer()) {
+                       for (final long barrierId : pendingCheckpointBarriers) {
+                               channelStateWriter.addInputData(
+                                       barrierId,
+                                       getChannelInfo(),
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));

Review comment:
       `LocalInputChannel` only spills when it awaits barrier. So it spills the 
buffer on first sight and it cannot be better on downsteam level. We could of 
course also move spilling lingering buffers to the upstream. It might also be 
an improvement for later, but it adds quite a bit of complexity as barriers 
also need to be propagated upstream.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -89,17 +96,15 @@
        /** The number of available buffers that have not been announced to the 
producer yet. */
        private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
-       /**
-        * The latest already triggered checkpoint id which would be updated 
during
-        * {@link #spillInflightBuffers(long, ChannelStateWriter)}.
-        */
-       @GuardedBy("receivedBuffers")
-       private long lastRequestedCheckpointId = -1;
+       private final BufferManager bufferManager;
 
-       /** The current received checkpoint id from the network. */
-       private long receivedCheckpointId = -1;
+       /** Stores #overtaken buffers when a checkpoint barrier is received 
before task thread started checkpoint. */
+       @GuardedBy("receivedBuffers")
+       private Map<Long, Integer> numBuffersOvertaken = new HashMap<>();
 
-       private final BufferManager bufferManager;
+       /** All started checkpoints where a barrier has not been received yet. 
*/
+       @GuardedBy("receivedBuffers")
+       private Deque<Long> pendingCheckpointBarriers = new ArrayDeque<>(2);

Review comment:
       Changed it to support only one concurrent checkpoint. I also extracted 
the whole logic into one helper class that can be used by both 
`LocalInputChannel` and `RemoteInputChannel` (with syncrhonization).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
 
                                wasEmpty = receivedBuffers.isEmpty();
-                               receivedBuffers.add(buffer);
 
-                               if (listener != null && buffer.isBuffer() && 
receivedCheckpointId < lastRequestedCheckpointId) {
-                                       notifyReceivedBuffer = 
buffer.retainBuffer();
+                               AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                               if (priorityEvent != null) {
+                                       
receivedBuffers.addPriorityElement(buffer);
+                                       final int pos = 
receivedBuffers.getNumPriorityElements();
+                                       if (priorityEvent instanceof 
CheckpointBarrier) {
+                                               final long barrierId = 
((CheckpointBarrier) priorityEvent).getId();
+                                               // don't spill future buffers 
for this checkpoint
+                                               if 
(!pendingCheckpointBarriers.remove(barrierId)) {
+                                                       // checkpoint was not 
yet started by task thread,
+                                                       // so remember the 
numbers of buffers to spill for the time when it will be started
+                                                       
numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos);
+                                               }
+                                       }
+                                       firstPriorityEvent = pos == 1;
                                } else {
-                                       notifyReceivedBuffer = null;
+                                       receivedBuffers.add(buffer);
+                                       if (buffer.isBuffer()) {
+                                               for (final long checkpointId : 
pendingCheckpointBarriers) {
+                                                       
channelStateWriter.addInputData(
+                                                               checkpointId,
+                                                               channelInfo,
+                                                               sequenceNumber,
+                                                               
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                                               }
+                                       }

Review comment:
       Much smaller now thanks to the helper class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
 
                                wasEmpty = receivedBuffers.isEmpty();
-                               receivedBuffers.add(buffer);
 
-                               if (listener != null && buffer.isBuffer() && 
receivedCheckpointId < lastRequestedCheckpointId) {
-                                       notifyReceivedBuffer = 
buffer.retainBuffer();
+                               AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                               if (priorityEvent != null) {
+                                       
receivedBuffers.addPriorityElement(buffer);
+                                       final int pos = 
receivedBuffers.getNumPriorityElements();
+                                       if (priorityEvent instanceof 
CheckpointBarrier) {
+                                               final long barrierId = 
((CheckpointBarrier) priorityEvent).getId();
+                                               // don't spill future buffers 
for this checkpoint
+                                               if 
(!pendingCheckpointBarriers.remove(barrierId)) {
+                                                       // checkpoint was not 
yet started by task thread,
+                                                       // so remember the 
numbers of buffers to spill for the time when it will be started
+                                                       
numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos);
+                                               }
+                                       }
+                                       firstPriorityEvent = pos == 1;
                                } else {
-                                       notifyReceivedBuffer = null;
+                                       receivedBuffers.add(buffer);
+                                       if (buffer.isBuffer()) {
+                                               for (final long checkpointId : 
pendingCheckpointBarriers) {
+                                                       
channelStateWriter.addInputData(
+                                                               checkpointId,
+                                                               channelInfo,
+                                                               sequenceNumber,
+                                                               
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                                               }
+                                       }
                                }
-                               notifyReceivedBarrier = listener != null ? 
parseCheckpointBarrierOrNull(buffer) : null;
                        }
                        recycleBuffer = false;
 
                        ++expectedSequenceNumber;
 
+                       if (firstPriorityEvent) {
+                               notifyPriorityEvent();
+                       }
                        if (wasEmpty) {
                                notifyChannelNonEmpty();
                        }
 
                        if (backlog >= 0) {
                                onSenderBacklog(backlog);
                        }
-
-                       if (notifyReceivedBarrier != null) {
-                               receivedCheckpointId = 
notifyReceivedBarrier.getId();
-                               if (notifyReceivedBarrier.isCheckpoint()) {
-                                       
listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
-                               }
-                       } else if (notifyReceivedBuffer != null) {
-                               
listener.notifyBufferReceived(notifyReceivedBuffer, channelInfo);
-                       }
                } finally {
                        if (recycleBuffer) {
                                buffer.recycleBuffer();
                        }
                }
        }
 
+       /**
+        * Spills all queued buffers on checkpoint start. If barrier has 
already been received (and reordered), spill only
+        * the overtaken buffers.
+        */
+       public void checkpointStarted(CheckpointBarrier barrier) {
+               checkState(channelStateWriter != null, "Channel state writer 
not injected");
+               synchronized (receivedBuffers) {
+                       final Integer numBuffers = 
numBuffersOvertaken.get(barrier.getId());
+                       if (numBuffers != null) {
+                               // already received barrier before the task 
thread picked up the barrier of this or another channel
+                               spillBuffers(barrier.getId(), numBuffers);
+                       } else {
+                               // barrier not yet received, spill all current 
and future buffers
+                               spillBuffers(barrier.getId(), 
receivedBuffers.getNumUnprioritizedElements());
+                               pendingCheckpointBarriers.add(barrier.getId());
+                       }
+               }
+       }
+
+       public void checkpointStopped(long checkpointId) {
+               synchronized (receivedBuffers) {
+                       numBuffersOvertaken.remove(checkpointId);
+                       pendingCheckpointBarriers.remove(checkpointId);
+               }
+       }
+
+       private void spillBuffers(long checkpointId, int numBuffers) {

Review comment:
       I added a helper class `ChannelStatePersister` and used `persist` 
everywhere to have a clear separation of the two names. `Persister` is more on 
the logical level and `Writer` on the physical implementation.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception {
                }
 
                InputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+               updatePriorityAvailability();
                checkFinished(inputStatus, readingInputIndex);
                return inputSelectionHandler.updateStatus(inputStatus, 
readingInputIndex);
        }
 
+       private void updatePriorityAvailability() {
+               if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) {
+                       final CompletableFuture<?> priorityEventAvailableFuture 
=
+                               
inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture();
+                       // no more priority events for the input
+                       if (!priorityEventAvailableFuture.isDone()) {
+                               
prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex);
+                               if 
(!prioritySelectionHandler.isAnyInputAvailable()) {
+                                       priorityAvailability.resetUnavailable();
+                               }
+                               
priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex));
+                       }
+               }
+       }
+
+       private Runnable onPriorityEvent(int index) {
+               return () -> {
+                       // set the priority flag in a mail before notifying 
StreamTask of availability
+                       mainMailboxExecutor.execute(() -> {
+                               
prioritySelectionHandler.setAvailableInput(index);
+                               
priorityAvailability.getUnavailableToResetAvailable().complete(null);
+                       }, "priority event {}", index);

Review comment:
       Commit removed; on `CheckpointedInputGate`, I'm adding the 
gate.toString().

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception {
                }
 
                InputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+               updatePriorityAvailability();
                checkFinished(inputStatus, readingInputIndex);
                return inputSelectionHandler.updateStatus(inputStatus, 
readingInputIndex);
        }
 
+       private void updatePriorityAvailability() {
+               if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) {
+                       final CompletableFuture<?> priorityEventAvailableFuture 
=
+                               
inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture();
+                       // no more priority events for the input
+                       if (!priorityEventAvailableFuture.isDone()) {
+                               
prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex);
+                               if 
(!prioritySelectionHandler.isAnyInputAvailable()) {
+                                       priorityAvailability.resetUnavailable();
+                               }
+                               
priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex));
+                       }
+               }
+       }
+
+       private Runnable onPriorityEvent(int index) {
+               return () -> {
+                       // set the priority flag in a mail before notifying 
StreamTask of availability
+                       mainMailboxExecutor.execute(() -> {
+                               
prioritySelectionHandler.setAvailableInput(index);
+                               
priorityAvailability.getUnavailableToResetAvailable().complete(null);

Review comment:
       Moved it even further up towards `CheckpointedInputGate`. At this point, 
we need to make sure that a priority event is really at the top (hence the 
optimistic lock protocol for notification).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -113,7 +113,11 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
 
                        if (++numBarriersReceived == numOpenChannels) {
                                allBarriersReceivedFuture.complete(null);
-                               resetPendingCheckpoint(barrierId);
+                               for (final InputGate gate : inputGates) {
+                                       for (int index = 0, numChannels = 
gate.getNumberOfInputChannels(); index < numChannels; index++) {
+                                               
gate.getChannel(index).checkpointStopped(currentCheckpointId);
+                                       }
+                               }

Review comment:
       Sorry that was just a test commit to see if the stuck e2e failed because 
of this change. I removed it. The original change is covered by a few unit 
tests already.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##########
@@ -361,4 +360,89 @@ public String toString() {
                                '}';
                }
        }
+
+       /**
+        * Helper class for persisting channel state via {@link 
ChannelStateWriter}.
+        */
+       @NotThreadSafe
+       protected final class ChannelStatePersister {
+               private static final long CHECKPOINT_COMPLETED = -1;
+
+               private static final long BARRIER_RECEIVED = -2;
+
+               /** All started checkpoints where a barrier has not been 
received yet. */
+               private long pendingCheckpointBarrierId = CHECKPOINT_COMPLETED;
+
+               /** Writer must be initialized before usage. {@link 
#startPersisting(long, List)} enforces this invariant. */
+               @Nullable
+               private final ChannelStateWriter channelStateWriter;
+
+               public ChannelStatePersister(@Nullable ChannelStateWriter 
channelStateWriter) {
+                       this.channelStateWriter = channelStateWriter;
+               }
+
+               protected void startPersisting(long barrierId, List<Buffer> 
knownBuffers) {
+                       checkState(isInitialized(), "Channel state writer not 
injected");
+
+                       if (pendingCheckpointBarrierId != BARRIER_RECEIVED) {
+                               pendingCheckpointBarrierId = barrierId;
+                       }
+                       if (knownBuffers.size() > 0) {
+                               channelStateWriter.addInputData(
+                                       barrierId,
+                                       channelInfo,
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.fromList(knownBuffers, Buffer::recycleBuffer));
+                       }
+               }
+
+               protected boolean isInitialized() {
+                       return channelStateWriter != null;
+               }
+
+               protected void stopPersisting() {
+                       pendingCheckpointBarrierId = CHECKPOINT_COMPLETED;
+               }
+
+               protected void maybePersist(Buffer buffer) {
+                       if (pendingCheckpointBarrierId >= 0 && 
buffer.isBuffer()) {
+                               channelStateWriter.addInputData(
+                                       pendingCheckpointBarrierId,
+                                       getChannelInfo(),
+                                       
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                                       
CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer));
+                       }
+               }
+
+               protected boolean checkForBarrier(Buffer buffer) throws 
IOException {
+                       final AbstractEvent priorityEvent = 
parsePriorityEvent(buffer);
+                       if (priorityEvent instanceof CheckpointBarrier) {
+                               pendingCheckpointBarrierId = BARRIER_RECEIVED;
+                               return true;
+                       }
+                       return false;
+               }
+
+               /**
+                * Parses the buffer as an event and returns the {@link 
CheckpointBarrier} if the event is indeed a barrier or
+                * returns null in all other cases.
+                */
+               @Nullable
+               protected AbstractEvent parsePriorityEvent(Buffer buffer) 
throws IOException {
+                       if (buffer.isBuffer() || 
!buffer.getDataType().hasPriority()) {
+                               return null;
+                       }
+
+                       AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+                       // reset the buffer because it would be deserialized 
again in SingleInputGate while getting next buffer.
+                       // we can further improve to avoid double 
deserialization in the future.
+                       buffer.setReaderIndex(0);
+                       return event;
+               }

Review comment:
       It's only used in this class and thus I inlined it. As is, it really 
looks like an interface of the Persister.
   
   There is similar code for `BufferConsumer` on output side, but the 
implementation is too different to align (copy buffer vs. reader index reset, 
different `EventSerializer.fromBuffer` overloads).

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
##########
@@ -120,67 +112,10 @@ public void testNoDataProcessedAfterCheckpointBarrier() 
throws Exception {
                assertEquals(0, output.getNumberOfEmittedRecords());
        }
 
-       @Test
-       public void testSnapshotWithTwoInputGates() throws Exception {

Review comment:
       I added this test for FLINK-18139 - input gate index issues in 
`Unaligner#hasInflightData`, which is called by `StreamTaskNetworkInput`, hence 
the test here.
   
   However, in this commit, `hasInflightData` is removed. Spilling is fully 
encapsulated in `InputChannel` + 
`CheckpointBarrierUnaligner`/`CheckpointedInputGate` and this interaction is 
already covered in tests at `CheckpointBarrierUnalignerTest`. 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -812,242 +788,13 @@ public void testQueuedBuffers() throws Exception {
                }
        }
 
-       @Test
-       public void testBufferReceivedListener() throws Exception {

Review comment:
       Good catch, my intent was to delete `testBufferReceivedListener` and 
`testPartitionNotFoundExceptionWhileGetNextBuffer` but not the test in between 
them.
   
   `testBufferReceivedListener` tests `BufferReceivedListener` which this 
commits renders useless (and is later removed).
   
   `testPartitionNotFoundExceptionWhileGetNextBuffer` tests concurrent spilling 
of lingering buffers and receiving of such lingering buffers. Both now happens 
in the same thread, so the test does not make any sense.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -62,9 +65,37 @@
         */
        public CheckpointedInputGate(
                        InputGate inputGate,
-                       CheckpointBarrierHandler barrierHandler) {
+                       CheckpointBarrierHandler barrierHandler,
+                       MailboxExecutor mailboxExecutor) {
                this.inputGate = inputGate;
                this.barrierHandler = barrierHandler;
+               this.mailboxExecutor = mailboxExecutor;
+
+               waitForPriorityEvents(inputGate, mailboxExecutor);
+       }
+
+       /**
+        * Eagerly pulls and processes all priority events. Must be called from 
task thread.
+        *
+        * <p>Basic assumption is that no priority event needs to be handled by 
the {@link StreamTaskNetworkInput}.
+        */
+       private void processPriorityEvents() throws IOException, 
InterruptedException {
+               // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
+               if (inputGate.getPriorityEventAvailableFuture().isDone()) {
+                       // process as many priority events as possible
+                       while 
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+                       }
+               }
+

Review comment:
       Good idea, I solved it in the following way:
   
   ```
                // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
                boolean hasPriorityEvent = 
inputGate.getPriorityEventAvailableFuture().isDone();
                while (hasPriorityEvent) {
                        // process as many priority events as possible
                        final Optional<BufferOrEvent> bufferOrEventOpt = 
pollNext();
                        bufferOrEventOpt.ifPresent(bufferOrEvent ->
                                checkState(bufferOrEvent.hasPriority(), "Should 
only poll priority events"));
                        hasPriorityEvent = 
bufferOrEventOpt.map(BufferOrEvent::morePriorityEvents).orElse(false);
                }
   ```
   
   `checkState(!inputGate.getPriorityEventAvailableFuture().isDone())` might be 
failing if netty receives a new priority event and triggers this available 
future while the task thread polled the last priority event. This case should 
happen quite often when the first barrier arrives (at that time the only 
priority event, morePriorityEvents = false) and triggers the whole 
checkpointing process. The second barrier would then complete the 
`getPriorityEventAvailableFuture` causing a more or less immediate re-execution 
of this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to