AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r492481729
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java ########## @@ -261,8 +260,7 @@ void registerSourceReader(ReaderInfo readerInfo) { * @param subtaskId the subtask id of the source reader. */ void unregisterSourceReader(int subtaskId) { - Preconditions.checkNotNull(registeredReaders.remove(subtaskId), String.format( - "Failed to unregister source reader of id %s because it is not registered.", subtaskId)); + registeredReaders.remove(subtaskId); Review comment: Added a ticket and referenced it properly https://issues.apache.org/jira/browse/FLINK-19338 . ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ########## @@ -124,23 +127,22 @@ public boolean isAvailable() { } /** - * Check whether this reader is available or not (internal use, in sync with - * {@link #isAvailable()}, but slightly faster). + * Returns the {@link org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer in line. * - * <p>Returns true only if the next buffer is an event or the reader has both available + * <p>Returns the next data type only if the next buffer is an event or the reader has both available * credits and buffers. * * @param bufferAndBacklog * current buffer and backlog including information about the next buffer + * @return the next data type if the next buffer can be pulled immediately or null */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + private Buffer.DataType getNextDataType(BufferAndBacklog bufferAndBacklog) { Review comment: I added an implNote in the javadoc. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.Objects; + +/** + * A deque-like data structure that supports prioritization of elements, such they will be polled before any + * non-priority elements. + * + * <p>{@implNote The current implementation deliberately does not implement the respective interface to minimize the maintenance + * effort. Furthermore, it's optimized for handling non-priority elements, such that all operations for adding priority + * elements are much slower than the non-priority counter-parts.} + * + * <p>Note that all element tests are performed by identity. + * + * @param <T> the element type. + */ +@Internal +public final class PrioritizedDeque<T> implements Iterable<T> { + private final Deque<T> deque = new ArrayDeque<>(); + private int numPriorityElements; + + /** + * Adds a priority element to this deque, such that it will be polled after all existing priority elements but + * before any non-priority element. + * + * @param element the element to add + */ + public void addPriorityElement(T element) { + // priority elements are rather rare and short-lived, so most of there are none + if (numPriorityElements == 0) { + deque.addFirst(element); + } else if (numPriorityElements == deque.size()) { + // no non-priority elements + deque.add(element); + } else { + // remove all priority elements + final ArrayDeque<T> priorPriority = new ArrayDeque<>(numPriorityElements); + for (int index = 0; index < numPriorityElements; index++) { + priorPriority.addFirst(deque.poll()); + } + deque.addFirst(element); + // readd them before the newly added element + for (final T priorityEvent : priorPriority) { + deque.addFirst(priorityEvent); + } + } + numPriorityElements++; + } + + /** + * Adds a non-priority element to this deque, which will be polled last. + * + * @param element the element to add + */ + public void add(T element) { + deque.add(element); + } + + /** + * Convenience method for adding an element with optional priority and prior removal. + * + * @param element the element to add + * @param priority flag indicating if it's a priority or non-priority element + * @param alreadyContained flag that hints that the element is already in this deque, potentially as non-priority element. + */ + public void add(T element, boolean priority, boolean alreadyContained) { Review comment: I'm moving it to the commit that starts using it. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -22,13 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting; Review comment: I admit that it's an awkward cut. However, it's only spilled in one place as the `BufferReceivedListener` methods are effectively not called in the previous commits anymore. I will make a later pass to see that all tests pass though. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -314,6 +315,21 @@ protected StreamTask( } this.channelIOExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("channel-state-unspilling")); + + injectChannelStateWriterIntoChannels(); + } + + private void injectChannelStateWriterIntoChannels() { + final Environment env = getEnvironment(); + final ChannelStateWriter channelStateWriter = subtaskCheckpointCoordinator.getChannelStateWriter(); + for (final InputGate gate : env.getAllInputGates()) { + gate.setChannelStateWriter(channelStateWriter); + } + for (ResultPartitionWriter writer : env.getAllWriters()) { + if (writer instanceof ChannelStateHolder) { + ((ChannelStateHolder) writer).setChannelStateWriter(channelStateWriter); + } + } Review comment: Happy for any other suggestion. I think the cast itself is in line with recent changes done by Stephan. The proper solution would be to inject in ctor but that will not happen until we merge `runtime` and `streaming`. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java ########## @@ -232,19 +232,16 @@ public int getInputIndex() { ChannelStateWriter channelStateWriter, long checkpointId) throws IOException { for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) { - final InputChannel channel = checkpointedInputGate.getChannel(channelIndex); - - // Assumption for retrieving buffers = one concurrent checkpoint RecordDeserializer<?> deserializer = recordDeserializers[channelIndex]; if (deserializer != null) { + final InputChannel channel = checkpointedInputGate.getChannel(channelIndex); + channelStateWriter.addInputData( checkpointId, channel.getChannelInfo(), ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, deserializer.getUnconsumedBuffer()); } - - checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, channelStateWriter); Review comment: We discussed offline, but let me summarize the main point. When spilling in main thread, buffers would be spilled rather late as you pointed out initially. One solution is to trigger the channel spilling in task thread as soon as possible and then resume spilling from netty until barrier arrives. However, that results in a complex threading model with lots of race condition as we found out in the version in master. Another solution is to spill in task thread and use any poll to discover new buffers and spill them. It's slightly slower and also requires lots of internal knowledge at the `Unaligner` about the channels to work well (mark all spilled channels). It's probably also suboptimal as new buffers are usually enqueued right after the head is polled, so one buffer is enqueued in the channel but not persisted until the new head is polled. The proposed solution is to spill in netty thread entirely. That's the fastest possible solution with an comparably easy threading model. Downside is the added complexity on channel-side, but the general idea is that upstream and downstream side of a channel is now self-contained. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish, boolean inser private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAsHead) { assert Thread.holdsLock(buffers); - if (insertAsHead) { - checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + - "checkpoints"); + if (!insertAsHead) { + buffers.add(bufferConsumer); + return; + } + checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. - for (BufferConsumer buffer : buffers) { - try (BufferConsumer bc = buffer.copy()) { - if (bc.isBuffer()) { + final int pos = buffers.getNumPriorityElements(); + buffers.addPriorityElement(bufferConsumer); + + boolean unalignedCheckpoint = isUnalignedCheckpoint(bufferConsumer); + if (unalignedCheckpoint) { + final Iterator<BufferConsumer> iterator = buffers.iterator(); + Iterators.advance(iterator, pos + 1); + while (iterator.hasNext()) { + BufferConsumer buffer = iterator.next(); + + if (buffer.isBuffer()) { + try (BufferConsumer bc = buffer.copy()) { inflightBufferSnapshot.add(bc.build()); } } } + } + } - buffers.addFirst(bufferConsumer); - } else { - buffers.add(bufferConsumer); + private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) { + boolean unalignedCheckpoint; + try (BufferConsumer bc = bufferConsumer.copy()) { + Buffer buffer = bc.build(); + try { + final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); + unalignedCheckpoint = event instanceof CheckpointBarrier; + } catch (IOException e) { + throw new IllegalStateException("Should always be able to deserialize in-memory event", e); + } finally { + buffer.recycleBuffer(); + } } + return unalignedCheckpoint; Review comment: Moved this method to the commit that spills immediately. We need it in that method to retrieve the checkpoint id to spill correctly. Deserialization is only necessary for priority events, which are very rare and rather cheap (30 bytes). I'd argue that adding a new call chain just to optimize it is not warranted. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish, boolean inser private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAsHead) { assert Thread.holdsLock(buffers); - if (insertAsHead) { - checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + - "checkpoints"); + if (!insertAsHead) { + buffers.add(bufferConsumer); + return; + } + checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + + "checkpoints"); - // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. Review comment: Moved the removal to the commit that spills immediately. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) { buffers.add(bufferConsumer); return false; } - checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + - "checkpoints"); final int pos = buffers.getNumPriorityElements(); buffers.addPriorityElement(bufferConsumer); - boolean unalignedCheckpoint = isUnalignedCheckpoint(bufferConsumer); - if (unalignedCheckpoint) { + CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer); + if (barrier != null) { + checkState( + barrier.getCheckpointOptions().isUnalignedCheckpoint(), + "Only unaligned checkpoints should be priority events"); final Iterator<BufferConsumer> iterator = buffers.iterator(); Iterators.advance(iterator, pos + 1); + List<Buffer> inflightBuffers = new ArrayList<>(); while (iterator.hasNext()) { BufferConsumer buffer = iterator.next(); if (buffer.isBuffer()) { try (BufferConsumer bc = buffer.copy()) { - inflightBufferSnapshot.add(bc.build()); + inflightBuffers.add(bc.build()); } } } + channelStateWriter.addOutputData( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + inflightBuffers.toArray(new Buffer[0])); } return pos == 0; } - private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) { - boolean unalignedCheckpoint; + @Nullable + private CheckpointBarrier parseCheckpointBarrier(BufferConsumer bufferConsumer) { Review comment: Hm I have a hard time coming up with a better code structure. I could add checkpoint parsing and the `if` in `addBuffer` already in the first commit `[FLINK-19026][network] Adding PrioritizedDeque and use it in PipelinedSubpartition.`. Then this diff would be only about persisting itself. But I was convinced that you would be confused why we need to parse the barrier at that commit. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ########## @@ -89,6 +92,14 @@ */ private final int[] inputGateChannelIndexOffsets; + /** + * The channel from which is currently polled, which allows interleaving of + * {@link #queueInputGate(IndexedInputGate, boolean)} and {@link #pollNext()} (FLINK-12510 (Deadlock when reading from InputGates)). + */ + @GuardedBy("inputGatesWithData") + @Nullable + private IndexedInputGate currentInputGate; + Review comment: Removed thanks to alternative fix of FLINK-12510 (see previous commit now). ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java ########## @@ -369,9 +369,7 @@ public void testMissingCancellationBarriers() throws Exception { inputGate = createBarrierBuffer(2, sequence, validator); Review comment: > A side-effect of this commit is that all events are handed over from CheckpointedInputGate to StreamTaskNetworkInput and break up the poll loop. However, since events are rare, it should have no visible impact on the throughput. The changes to the tests are now handling the additionally emitted events. Imho tests are easier to read now (no magically disappearing buffers in the sequence). ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ########## @@ -124,23 +127,22 @@ public boolean isAvailable() { } /** - * Check whether this reader is available or not (internal use, in sync with - * {@link #isAvailable()}, but slightly faster). + * Returns the {@link org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer in line. * - * <p>Returns true only if the next buffer is an event or the reader has both available + * <p>Returns the next data type only if the next buffer is an event or the reader has both available * credits and buffers. * * @param bufferAndBacklog * current buffer and backlog including information about the next buffer + * @return the next data type if the next buffer can be pulled immediately or null */ - private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + private Buffer.DataType getNextDataType(BufferAndBacklog bufferAndBacklog) { // BEWARE: this must be in sync with #isAvailable()! Review comment: I expanded the comment (now javadoc) to clearly state the contract. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -779,34 +820,56 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) { })); } - private void queueChannel(InputChannel channel) { - int availableChannels; + private void queueChannel(InputChannel channel, boolean priority) { + CompletableFuture<?> toNotifyPriority = null; Review comment: Extracted into `GateNotificationHelper`, please check if it's actually helping to reduce complexity. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java ########## @@ -34,11 +30,8 @@ void notifyDataAvailable(); /** - * Allows the listener to react to a priority event before it is added to the outgoing buffer queue. - * - * @return true if the event has been fully processed and should not be added to the buffer queue. + * Called when the first priority event is added to the head of the buffer queue. */ - default boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException { - return false; + default void notifyPriorityEvent() { Review comment: Added a few thoughts. Let me know if it makes things clearer. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -147,6 +148,14 @@ else if (configuredInput instanceof SourceInputConfig) { return anyInputAvailable; } + @Override + public CompletableFuture<?> getPriorityEventAvailableFuture() { + return CompletableFuture.anyOf( + Arrays.stream(inputProcessors) + .map(inputProcessor -> inputProcessor.taskInput.getPriorityEventAvailableFuture()) + .toArray(CompletableFuture[]::new)); Review comment: Commit is removed. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ########## @@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) { buffers.add(bufferConsumer); return false; } - checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + - "checkpoints"); final int pos = buffers.getNumPriorityElements(); buffers.addPriorityElement(bufferConsumer); - boolean unalignedCheckpoint = isUnalignedCheckpoint(bufferConsumer); - if (unalignedCheckpoint) { + CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer); + if (barrier != null) { + checkState( + barrier.getCheckpointOptions().isUnalignedCheckpoint(), + "Only unaligned checkpoints should be priority events"); final Iterator<BufferConsumer> iterator = buffers.iterator(); Iterators.advance(iterator, pos + 1); + List<Buffer> inflightBuffers = new ArrayList<>(); while (iterator.hasNext()) { BufferConsumer buffer = iterator.next(); if (buffer.isBuffer()) { try (BufferConsumer bc = buffer.copy()) { - inflightBufferSnapshot.add(bc.build()); + inflightBuffers.add(bc.build()); } } } + channelStateWriter.addOutputData( + barrier.getId(), + subpartitionInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + inflightBuffers.toArray(new Buffer[0])); Review comment: Not relevant in the final version where the channel spills by itself (no return value on this method). I can make it clearer in the commit message if you want. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ########## @@ -210,15 +221,25 @@ public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelSt } Buffer buffer = next.buffer(); - CheckpointBarrier notifyReceivedBarrier = parseCheckpointBarrierOrNull(buffer); - if (notifyReceivedBarrier != null) { - receivedCheckpointId = notifyReceivedBarrier.getId(); - } else if (receivedCheckpointId < lastRequestedCheckpointId && buffer.isBuffer()) { - inputGate.getBufferReceivedListener().notifyBufferReceived(buffer.retainBuffer(), channelInfo); - } numBytesIn.inc(buffer.getSize()); numBuffersIn.inc(); + if (buffer.isBuffer()) { + for (final long barrierId : pendingCheckpointBarriers) { + channelStateWriter.addInputData( + barrierId, + getChannelInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer)); Review comment: `LocalInputChannel` only spills when it awaits barrier. So it spills the buffer on first sight and it cannot be better on downsteam level. We could of course also move spilling lingering buffers to the upstream. It might also be an improvement for later. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ########## @@ -210,15 +221,25 @@ public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelSt } Buffer buffer = next.buffer(); - CheckpointBarrier notifyReceivedBarrier = parseCheckpointBarrierOrNull(buffer); - if (notifyReceivedBarrier != null) { - receivedCheckpointId = notifyReceivedBarrier.getId(); - } else if (receivedCheckpointId < lastRequestedCheckpointId && buffer.isBuffer()) { - inputGate.getBufferReceivedListener().notifyBufferReceived(buffer.retainBuffer(), channelInfo); - } numBytesIn.inc(buffer.getSize()); numBuffersIn.inc(); + if (buffer.isBuffer()) { + for (final long barrierId : pendingCheckpointBarriers) { + channelStateWriter.addInputData( + barrierId, + getChannelInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer)); Review comment: `LocalInputChannel` only spills when it awaits barrier. So it spills the buffer on first sight and it cannot be better on downsteam level. We could of course also move spilling lingering buffers to the upstream. It might also be an improvement for later, but it adds quite a bit of complexity as barriers also need to be propagated upstream. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -89,17 +96,15 @@ /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); - /** - * The latest already triggered checkpoint id which would be updated during - * {@link #spillInflightBuffers(long, ChannelStateWriter)}. - */ - @GuardedBy("receivedBuffers") - private long lastRequestedCheckpointId = -1; + private final BufferManager bufferManager; - /** The current received checkpoint id from the network. */ - private long receivedCheckpointId = -1; + /** Stores #overtaken buffers when a checkpoint barrier is received before task thread started checkpoint. */ + @GuardedBy("receivedBuffers") + private Map<Long, Integer> numBuffersOvertaken = new HashMap<>(); - private final BufferManager bufferManager; + /** All started checkpoints where a barrier has not been received yet. */ + @GuardedBy("receivedBuffers") + private Deque<Long> pendingCheckpointBarriers = new ArrayDeque<>(2); Review comment: Changed it to support only one concurrent checkpoint. I also extracted the whole logic into one helper class that can be used by both `LocalInputChannel` and `RemoteInputChannel` (with syncrhonization). ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx } wasEmpty = receivedBuffers.isEmpty(); - receivedBuffers.add(buffer); - if (listener != null && buffer.isBuffer() && receivedCheckpointId < lastRequestedCheckpointId) { - notifyReceivedBuffer = buffer.retainBuffer(); + AbstractEvent priorityEvent = parsePriorityEvent(buffer); + if (priorityEvent != null) { + receivedBuffers.addPriorityElement(buffer); + final int pos = receivedBuffers.getNumPriorityElements(); + if (priorityEvent instanceof CheckpointBarrier) { + final long barrierId = ((CheckpointBarrier) priorityEvent).getId(); + // don't spill future buffers for this checkpoint + if (!pendingCheckpointBarriers.remove(barrierId)) { + // checkpoint was not yet started by task thread, + // so remember the numbers of buffers to spill for the time when it will be started + numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos); + } + } + firstPriorityEvent = pos == 1; } else { - notifyReceivedBuffer = null; + receivedBuffers.add(buffer); + if (buffer.isBuffer()) { + for (final long checkpointId : pendingCheckpointBarriers) { + channelStateWriter.addInputData( + checkpointId, + channelInfo, + sequenceNumber, + CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer)); + } + } Review comment: Much smaller now thanks to the helper class. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -454,42 +431,106 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx } wasEmpty = receivedBuffers.isEmpty(); - receivedBuffers.add(buffer); - if (listener != null && buffer.isBuffer() && receivedCheckpointId < lastRequestedCheckpointId) { - notifyReceivedBuffer = buffer.retainBuffer(); + AbstractEvent priorityEvent = parsePriorityEvent(buffer); + if (priorityEvent != null) { + receivedBuffers.addPriorityElement(buffer); + final int pos = receivedBuffers.getNumPriorityElements(); + if (priorityEvent instanceof CheckpointBarrier) { + final long barrierId = ((CheckpointBarrier) priorityEvent).getId(); + // don't spill future buffers for this checkpoint + if (!pendingCheckpointBarriers.remove(barrierId)) { + // checkpoint was not yet started by task thread, + // so remember the numbers of buffers to spill for the time when it will be started + numBuffersOvertaken.put(barrierId, receivedBuffers.size() - pos); + } + } + firstPriorityEvent = pos == 1; } else { - notifyReceivedBuffer = null; + receivedBuffers.add(buffer); + if (buffer.isBuffer()) { + for (final long checkpointId : pendingCheckpointBarriers) { + channelStateWriter.addInputData( + checkpointId, + channelInfo, + sequenceNumber, + CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer)); + } + } } - notifyReceivedBarrier = listener != null ? parseCheckpointBarrierOrNull(buffer) : null; } recycleBuffer = false; ++expectedSequenceNumber; + if (firstPriorityEvent) { + notifyPriorityEvent(); + } if (wasEmpty) { notifyChannelNonEmpty(); } if (backlog >= 0) { onSenderBacklog(backlog); } - - if (notifyReceivedBarrier != null) { - receivedCheckpointId = notifyReceivedBarrier.getId(); - if (notifyReceivedBarrier.isCheckpoint()) { - listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo); - } - } else if (notifyReceivedBuffer != null) { - listener.notifyBufferReceived(notifyReceivedBuffer, channelInfo); - } } finally { if (recycleBuffer) { buffer.recycleBuffer(); } } } + /** + * Spills all queued buffers on checkpoint start. If barrier has already been received (and reordered), spill only + * the overtaken buffers. + */ + public void checkpointStarted(CheckpointBarrier barrier) { + checkState(channelStateWriter != null, "Channel state writer not injected"); + synchronized (receivedBuffers) { + final Integer numBuffers = numBuffersOvertaken.get(barrier.getId()); + if (numBuffers != null) { + // already received barrier before the task thread picked up the barrier of this or another channel + spillBuffers(barrier.getId(), numBuffers); + } else { + // barrier not yet received, spill all current and future buffers + spillBuffers(barrier.getId(), receivedBuffers.getNumUnprioritizedElements()); + pendingCheckpointBarriers.add(barrier.getId()); + } + } + } + + public void checkpointStopped(long checkpointId) { + synchronized (receivedBuffers) { + numBuffersOvertaken.remove(checkpointId); + pendingCheckpointBarriers.remove(checkpointId); + } + } + + private void spillBuffers(long checkpointId, int numBuffers) { Review comment: I added a helper class `ChannelStatePersister` and used `persist` everywhere to have a clear separation of the two names. `Persister` is more on the logical level and `Writer` on the physical implementation. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception { } InputStatus inputStatus = inputProcessors[readingInputIndex].processInput(); + updatePriorityAvailability(); checkFinished(inputStatus, readingInputIndex); return inputSelectionHandler.updateStatus(inputStatus, readingInputIndex); } + private void updatePriorityAvailability() { + if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) { + final CompletableFuture<?> priorityEventAvailableFuture = + inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture(); + // no more priority events for the input + if (!priorityEventAvailableFuture.isDone()) { + prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex); + if (!prioritySelectionHandler.isAnyInputAvailable()) { + priorityAvailability.resetUnavailable(); + } + priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex)); + } + } + } + + private Runnable onPriorityEvent(int index) { + return () -> { + // set the priority flag in a mail before notifying StreamTask of availability + mainMailboxExecutor.execute(() -> { + prioritySelectionHandler.setAvailableInput(index); + priorityAvailability.getUnavailableToResetAvailable().complete(null); + }, "priority event {}", index); Review comment: Commit removed; on `CheckpointedInputGate`, I'm adding the gate.toString(). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java ########## @@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception { } InputStatus inputStatus = inputProcessors[readingInputIndex].processInput(); + updatePriorityAvailability(); checkFinished(inputStatus, readingInputIndex); return inputSelectionHandler.updateStatus(inputStatus, readingInputIndex); } + private void updatePriorityAvailability() { + if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) { + final CompletableFuture<?> priorityEventAvailableFuture = + inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture(); + // no more priority events for the input + if (!priorityEventAvailableFuture.isDone()) { + prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex); + if (!prioritySelectionHandler.isAnyInputAvailable()) { + priorityAvailability.resetUnavailable(); + } + priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex)); + } + } + } + + private Runnable onPriorityEvent(int index) { + return () -> { + // set the priority flag in a mail before notifying StreamTask of availability + mainMailboxExecutor.execute(() -> { + prioritySelectionHandler.setAvailableInput(index); + priorityAvailability.getUnavailableToResetAvailable().complete(null); Review comment: Moved it even further up towards `CheckpointedInputGate`. At this point, we need to make sure that a priority event is really at the top (hence the optimistic lock protocol for notification). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -113,7 +113,11 @@ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelIn if (++numBarriersReceived == numOpenChannels) { allBarriersReceivedFuture.complete(null); - resetPendingCheckpoint(barrierId); + for (final InputGate gate : inputGates) { + for (int index = 0, numChannels = gate.getNumberOfInputChannels(); index < numChannels; index++) { + gate.getChannel(index).checkpointStopped(currentCheckpointId); + } + } Review comment: Sorry that was just a test commit to see if the stuck e2e failed because of this change. I removed it. The original change is covered by a few unit tests already. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ########## @@ -361,4 +360,89 @@ public String toString() { '}'; } } + + /** + * Helper class for persisting channel state via {@link ChannelStateWriter}. + */ + @NotThreadSafe + protected final class ChannelStatePersister { + private static final long CHECKPOINT_COMPLETED = -1; + + private static final long BARRIER_RECEIVED = -2; + + /** All started checkpoints where a barrier has not been received yet. */ + private long pendingCheckpointBarrierId = CHECKPOINT_COMPLETED; + + /** Writer must be initialized before usage. {@link #startPersisting(long, List)} enforces this invariant. */ + @Nullable + private final ChannelStateWriter channelStateWriter; + + public ChannelStatePersister(@Nullable ChannelStateWriter channelStateWriter) { + this.channelStateWriter = channelStateWriter; + } + + protected void startPersisting(long barrierId, List<Buffer> knownBuffers) { + checkState(isInitialized(), "Channel state writer not injected"); + + if (pendingCheckpointBarrierId != BARRIER_RECEIVED) { + pendingCheckpointBarrierId = barrierId; + } + if (knownBuffers.size() > 0) { + channelStateWriter.addInputData( + barrierId, + channelInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + CloseableIterator.fromList(knownBuffers, Buffer::recycleBuffer)); + } + } + + protected boolean isInitialized() { + return channelStateWriter != null; + } + + protected void stopPersisting() { + pendingCheckpointBarrierId = CHECKPOINT_COMPLETED; + } + + protected void maybePersist(Buffer buffer) { + if (pendingCheckpointBarrierId >= 0 && buffer.isBuffer()) { + channelStateWriter.addInputData( + pendingCheckpointBarrierId, + getChannelInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + CloseableIterator.ofElement(buffer.retainBuffer(), Buffer::recycleBuffer)); + } + } + + protected boolean checkForBarrier(Buffer buffer) throws IOException { + final AbstractEvent priorityEvent = parsePriorityEvent(buffer); + if (priorityEvent instanceof CheckpointBarrier) { + pendingCheckpointBarrierId = BARRIER_RECEIVED; + return true; + } + return false; + } + + /** + * Parses the buffer as an event and returns the {@link CheckpointBarrier} if the event is indeed a barrier or + * returns null in all other cases. + */ + @Nullable + protected AbstractEvent parsePriorityEvent(Buffer buffer) throws IOException { + if (buffer.isBuffer() || !buffer.getDataType().hasPriority()) { + return null; + } + + AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); + // reset the buffer because it would be deserialized again in SingleInputGate while getting next buffer. + // we can further improve to avoid double deserialization in the future. + buffer.setReaderIndex(0); + return event; + } Review comment: It's only used in this class and thus I inlined it. As is, it really looks like an interface of the Persister. There is similar code for `BufferConsumer` on output side, but the implementation is too different to align (copy buffer vs. reader index reset, different `EventSerializer.fromBuffer` overloads). ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java ########## @@ -120,67 +112,10 @@ public void testNoDataProcessedAfterCheckpointBarrier() throws Exception { assertEquals(0, output.getNumberOfEmittedRecords()); } - @Test - public void testSnapshotWithTwoInputGates() throws Exception { Review comment: I added this test for FLINK-18139 - input gate index issues in `Unaligner#hasInflightData`, which is called by `StreamTaskNetworkInput`, hence the test here. However, in this commit, `hasInflightData` is removed. Spilling is fully encapsulated in `InputChannel` + `CheckpointBarrierUnaligner`/`CheckpointedInputGate` and this interaction is already covered in tests at `CheckpointBarrierUnalignerTest`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ########## @@ -812,242 +788,13 @@ public void testQueuedBuffers() throws Exception { } } - @Test - public void testBufferReceivedListener() throws Exception { Review comment: Good catch, my intent was to delete `testBufferReceivedListener` and `testPartitionNotFoundExceptionWhileGetNextBuffer` but not the test in between them. `testBufferReceivedListener` tests `BufferReceivedListener` which this commits renders useless (and is later removed). `testPartitionNotFoundExceptionWhileGetNextBuffer` tests concurrent spilling of lingering buffers and receiving of such lingering buffers. Both now happens in the same thread, so the test does not make any sense. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java ########## @@ -62,9 +65,37 @@ */ public CheckpointedInputGate( InputGate inputGate, - CheckpointBarrierHandler barrierHandler) { + CheckpointBarrierHandler barrierHandler, + MailboxExecutor mailboxExecutor) { this.inputGate = inputGate; this.barrierHandler = barrierHandler; + this.mailboxExecutor = mailboxExecutor; + + waitForPriorityEvents(inputGate, mailboxExecutor); + } + + /** + * Eagerly pulls and processes all priority events. Must be called from task thread. + * + * <p>Basic assumption is that no priority event needs to be handled by the {@link StreamTaskNetworkInput}. + */ + private void processPriorityEvents() throws IOException, InterruptedException { + // check if the priority event is still not processed (could have been pulled before mail was being executed) + if (inputGate.getPriorityEventAvailableFuture().isDone()) { + // process as many priority events as possible + while (pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) { + } + } + Review comment: Good idea, I solved it in the following way: ``` // check if the priority event is still not processed (could have been pulled before mail was being executed) boolean hasPriorityEvent = inputGate.getPriorityEventAvailableFuture().isDone(); while (hasPriorityEvent) { // process as many priority events as possible final Optional<BufferOrEvent> bufferOrEventOpt = pollNext(); bufferOrEventOpt.ifPresent(bufferOrEvent -> checkState(bufferOrEvent.hasPriority(), "Should only poll priority events")); hasPriorityEvent = bufferOrEventOpt.map(BufferOrEvent::morePriorityEvents).orElse(false); } ``` `checkState(!inputGate.getPriorityEventAvailableFuture().isDone())` might be failing if netty receives a new priority event and triggers this available future while the task thread polled the last priority event. This case should happen quite often when the first barrier arrives (at that time the only priority event, morePriorityEvents = false) and triggers the whole checkpointing process. The second barrier would then complete the `getPriorityEventAvailableFuture` causing a more or less immediate re-execution of this method. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org