AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492481729



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -261,8 +260,7 @@ void registerSourceReader(ReaderInfo readerInfo) {
         * @param subtaskId the subtask id of the source reader.
         */
        void unregisterSourceReader(int subtaskId) {
-               Preconditions.checkNotNull(registeredReaders.remove(subtaskId), 
String.format(
-                               "Failed to unregister source reader of id %s 
because it is not registered.", subtaskId));
+               registeredReaders.remove(subtaskId);

Review comment:
       Added a ticket and referenced it properly 
https://issues.apache.org/jira/browse/FLINK-19338 .

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -124,23 +127,22 @@ public boolean isAvailable() {
        }
 
        /**
-        * Check whether this reader is available or not (internal use, in sync 
with
-        * {@link #isAvailable()}, but slightly faster).
+        * Returns the {@link 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer 
in line.
         *
-        * <p>Returns true only if the next buffer is an event or the reader 
has both available
+        * <p>Returns the next data type only if the next buffer is an event or 
the reader has both available
         * credits and buffers.
         *
         * @param bufferAndBacklog
         *              current buffer and backlog including information about 
the next buffer
+        * @return the next data type if the next buffer can be pulled 
immediately or null
         */
-       private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+       private Buffer.DataType getNextDataType(BufferAndBacklog 
bufferAndBacklog) {

Review comment:
       I added an implNote in the javadoc.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Objects;
+
+/**
+ * A deque-like data structure that supports prioritization of elements, such 
they will be polled before any
+ * non-priority elements.
+ *
+ * <p>{@implNote The current implementation deliberately does not implement 
the respective interface to minimize the maintenance
+ * effort. Furthermore, it's optimized for handling non-priority elements, 
such that all operations for adding priority
+ * elements are much slower than the non-priority counter-parts.}
+ *
+ * <p>Note that all element tests are performed by identity.
+ *
+ * @param <T> the element type.
+ */
+@Internal
+public final class PrioritizedDeque<T> implements Iterable<T> {
+       private final Deque<T> deque = new ArrayDeque<>();
+       private int numPriorityElements;
+
+       /**
+        * Adds a priority element to this deque, such that it will be polled 
after all existing priority elements but
+        * before any non-priority element.
+        *
+        * @param element the element to add
+        */
+       public void addPriorityElement(T element) {
+               // priority elements are rather rare and short-lived, so most 
of there are none
+               if (numPriorityElements == 0) {
+                       deque.addFirst(element);
+               } else if (numPriorityElements == deque.size()) {
+                       // no non-priority elements
+                       deque.add(element);
+               } else {
+                       // remove all priority elements
+                       final ArrayDeque<T> priorPriority = new 
ArrayDeque<>(numPriorityElements);
+                       for (int index = 0; index < numPriorityElements; 
index++) {
+                               priorPriority.addFirst(deque.poll());
+                       }
+                       deque.addFirst(element);
+                       // readd them before the newly added element
+                       for (final T priorityEvent : priorPriority) {
+                               deque.addFirst(priorityEvent);
+                       }
+               }
+               numPriorityElements++;
+       }
+
+       /**
+        * Adds a non-priority element to this deque, which will be polled last.
+        *
+        * @param element the element to add
+        */
+       public void add(T element) {
+               deque.add(element);
+       }
+
+       /**
+        * Convenience method for adding an element with optional priority and 
prior removal.
+        *
+        * @param element the element to add
+        * @param priority flag indicating if it's a priority or non-priority 
element
+        * @param alreadyContained flag that hints that the element is already 
in this deque, potentially as non-priority element.
+        */
+       public void add(T element, boolean priority, boolean alreadyContained) {

Review comment:
       I'm moving it to the commit that starts using it.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -22,13 +22,10 @@
 import org.apache.flink.annotation.VisibleForTesting;

Review comment:
       I admit that it's an awkward cut. 
   
   However, it's only spilled in one place as the `BufferReceivedListener` 
methods are effectively not called in the previous commits anymore. I will make 
a later pass to see that all tests pass though.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -314,6 +315,21 @@ protected StreamTask(
                }
 
                this.channelIOExecutor = Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory("channel-state-unspilling"));
+
+               injectChannelStateWriterIntoChannels();
+       }
+
+       private void injectChannelStateWriterIntoChannels() {
+               final Environment env = getEnvironment();
+               final ChannelStateWriter channelStateWriter = 
subtaskCheckpointCoordinator.getChannelStateWriter();
+               for (final InputGate gate : env.getAllInputGates()) {
+                       gate.setChannelStateWriter(channelStateWriter);
+               }
+               for (ResultPartitionWriter writer : env.getAllWriters()) {
+                       if (writer instanceof ChannelStateHolder) {
+                               ((ChannelStateHolder) 
writer).setChannelStateWriter(channelStateWriter);
+                       }
+               }

Review comment:
       Happy for any other suggestion. I think the cast itself is in line with 
recent changes done by Stephan.
   
   The proper solution would be to inject in ctor but that will not happen 
until we merge `runtime` and `streaming`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -232,19 +232,16 @@ public int getInputIndex() {
                        ChannelStateWriter channelStateWriter,
                        long checkpointId) throws IOException {
                for (int channelIndex = 0; channelIndex < 
recordDeserializers.length; channelIndex++) {
-                       final InputChannel channel = 
checkpointedInputGate.getChannel(channelIndex);
-
-                       // Assumption for retrieving buffers = one concurrent 
checkpoint
                        RecordDeserializer<?> deserializer = 
recordDeserializers[channelIndex];
                        if (deserializer != null) {
+                               final InputChannel channel = 
checkpointedInputGate.getChannel(channelIndex);
+
                                channelStateWriter.addInputData(
                                        checkpointId,
                                        channel.getChannelInfo(),
                                        
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
                                        deserializer.getUnconsumedBuffer());
                        }
-
-                       
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, 
channelStateWriter);

Review comment:
       We discussed offline, but let me summarize the main point. When spilling 
in main thread, buffers would be spilled rather late as you pointed out 
initially. 
   One solution is to trigger the channel spilling in task thread as soon as 
possible and then resume spilling from netty until barrier arrives. However, 
that results in a complex threading model with lots of race condition as we 
found out in the version in master.
   Another solution is to spill in task thread and use any poll to discover new 
buffers and spill them. It's slightly slower and also requires lots of internal 
knowledge at the `Unaligner` about the channels to work well (mark all spilled 
channels). It's probably also suboptimal as new buffers are usually enqueued 
right after the head is polled, so one buffer is enqueued in the channel but 
not persisted until the new head is polled.
   The proposed solution is to spill in netty thread entirely. That's the 
fastest possible solution with an comparably easy threading model. Downside is 
the added complexity on channel-side, but the general idea is that upstream and 
downstream side of a channel is now self-contained.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, 
boolean finish, boolean inser
 
        private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean 
insertAsHead) {
                assert Thread.holdsLock(buffers);
-               if (insertAsHead) {
-                       checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
-                               "checkpoints");
+               if (!insertAsHead) {
+                       buffers.add(bufferConsumer);
+                       return;
+               }
+               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
+                       "checkpoints");
 
-                       // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-                       for (BufferConsumer buffer : buffers) {
-                               try (BufferConsumer bc = buffer.copy()) {
-                                       if (bc.isBuffer()) {
+               final int pos = buffers.getNumPriorityElements();
+               buffers.addPriorityElement(bufferConsumer);
+
+               boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+               if (unalignedCheckpoint) {
+                       final Iterator<BufferConsumer> iterator = 
buffers.iterator();
+                       Iterators.advance(iterator, pos + 1);
+                       while (iterator.hasNext()) {
+                               BufferConsumer buffer = iterator.next();
+
+                               if (buffer.isBuffer()) {
+                                       try (BufferConsumer bc = buffer.copy()) 
{
                                                
inflightBufferSnapshot.add(bc.build());
                                        }
                                }
                        }
+               }
+       }
 
-                       buffers.addFirst(bufferConsumer);
-               } else {
-                       buffers.add(bufferConsumer);
+       private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) {
+               boolean unalignedCheckpoint;
+               try (BufferConsumer bc = bufferConsumer.copy()) {
+                       Buffer buffer = bc.build();
+                       try {
+                               final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+                               unalignedCheckpoint = event instanceof 
CheckpointBarrier;
+                       } catch (IOException e) {
+                               throw new IllegalStateException("Should always 
be able to deserialize in-memory event", e);
+                       } finally {
+                               buffer.recycleBuffer();
+                       }
                }
+               return unalignedCheckpoint;

Review comment:
       Moved this method to the commit that spills immediately. We need it in 
that method to retrieve the checkpoint id to spill correctly.
   
   Deserialization is only necessary for priority events, which are very rare 
and rather cheap (30 bytes). I'd argue that adding a new call chain just to 
optimize it is not warranted.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer, 
boolean finish, boolean inser
 
        private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean 
insertAsHead) {
                assert Thread.holdsLock(buffers);
-               if (insertAsHead) {
-                       checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
-                               "checkpoints");
+               if (!insertAsHead) {
+                       buffers.add(bufferConsumer);
+                       return;
+               }
+               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
+                       "checkpoints");
 
-                       // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.

Review comment:
       Moved the removal to the commit that spills immediately.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) {
                        buffers.add(bufferConsumer);
                        return false;
                }
-               checkState(inflightBufferSnapshot.isEmpty(), "Supporting only 
one concurrent checkpoint in unaligned " +
-                       "checkpoints");
 
                final int pos = buffers.getNumPriorityElements();
                buffers.addPriorityElement(bufferConsumer);
 
-               boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
-               if (unalignedCheckpoint) {
+               CheckpointBarrier barrier = 
parseCheckpointBarrier(bufferConsumer);
+               if (barrier != null) {
+                       checkState(
+                               
barrier.getCheckpointOptions().isUnalignedCheckpoint(),
+                               "Only unaligned checkpoints should be priority 
events");
                        final Iterator<BufferConsumer> iterator = 
buffers.iterator();
                        Iterators.advance(iterator, pos + 1);
+                       List<Buffer> inflightBuffers = new ArrayList<>();
                        while (iterator.hasNext()) {
                                BufferConsumer buffer = iterator.next();
 
                                if (buffer.isBuffer()) {
                                        try (BufferConsumer bc = buffer.copy()) 
{
-                                               
inflightBufferSnapshot.add(bc.build());
+                                               inflightBuffers.add(bc.build());
                                        }
                                }
                        }
+                       channelStateWriter.addOutputData(
+                               barrier.getId(),
+                               subpartitionInfo,
+                               ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                               inflightBuffers.toArray(new Buffer[0]));
                }
                return pos == 0;
        }
 
-       private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) {
-               boolean unalignedCheckpoint;
+       @Nullable
+       private CheckpointBarrier parseCheckpointBarrier(BufferConsumer 
bufferConsumer) {

Review comment:
       Hm I have a hard time coming up with a better code structure. I could 
add checkpoint parsing and the `if` in `addBuffer` already in the first commit 
`[FLINK-19026][network] Adding PrioritizedDeque and use it in 
PipelinedSubpartition.`. Then this diff would be only about persisting itself.
   
   But I was convinced that you would be confused why we need to parse the 
barrier at that commit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
##########
@@ -89,6 +92,14 @@
         */
        private final int[] inputGateChannelIndexOffsets;
 
+       /**
+        * The channel from which is currently polled, which allows 
interleaving of
+        * {@link #queueInputGate(IndexedInputGate, boolean)} and {@link 
#pollNext()} (FLINK-12510 (Deadlock when reading from InputGates)).
+        */
+       @GuardedBy("inputGatesWithData")
+       @Nullable
+       private IndexedInputGate currentInputGate;
+

Review comment:
       Removed thanks to alternative fix of FLINK-12510 (see previous commit 
now).

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
##########
@@ -369,9 +369,7 @@ public void testMissingCancellationBarriers() throws 
Exception {
                inputGate = createBarrierBuffer(2, sequence, validator);

Review comment:
       > A side-effect of this commit is that all events are handed over from 
CheckpointedInputGate to StreamTaskNetworkInput and break up the poll loop. 
However, since events are rare, it should have no visible impact on the 
throughput.
   
   The changes to the tests are now handling the additionally emitted events. 
Imho tests are easier to read now (no magically disappearing buffers in the 
sequence).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -124,23 +127,22 @@ public boolean isAvailable() {
        }
 
        /**
-        * Check whether this reader is available or not (internal use, in sync 
with
-        * {@link #isAvailable()}, but slightly faster).
+        * Returns the {@link 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer 
in line.
         *
-        * <p>Returns true only if the next buffer is an event or the reader 
has both available
+        * <p>Returns the next data type only if the next buffer is an event or 
the reader has both available
         * credits and buffers.
         *
         * @param bufferAndBacklog
         *              current buffer and backlog including information about 
the next buffer
+        * @return the next data type if the next buffer can be pulled 
immediately or null
         */
-       private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+       private Buffer.DataType getNextDataType(BufferAndBacklog 
bufferAndBacklog) {
                // BEWARE: this must be in sync with #isAvailable()!

Review comment:
       I expanded the comment (now javadoc) to clearly state the contract.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to