AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492481729
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -261,8 +260,7 @@ void registerSourceReader(ReaderInfo readerInfo) {
* @param subtaskId the subtask id of the source reader.
*/
void unregisterSourceReader(int subtaskId) {
- Preconditions.checkNotNull(registeredReaders.remove(subtaskId),
String.format(
- "Failed to unregister source reader of id %s
because it is not registered.", subtaskId));
+ registeredReaders.remove(subtaskId);
Review comment:
Added a ticket and referenced it properly
https://issues.apache.org/jira/browse/FLINK-19338 .
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -124,23 +127,22 @@ public boolean isAvailable() {
}
/**
- * Check whether this reader is available or not (internal use, in sync
with
- * {@link #isAvailable()}, but slightly faster).
+ * Returns the {@link
org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer
in line.
*
- * <p>Returns true only if the next buffer is an event or the reader
has both available
+ * <p>Returns the next data type only if the next buffer is an event or
the reader has both available
* credits and buffers.
*
* @param bufferAndBacklog
* current buffer and backlog including information about
the next buffer
+ * @return the next data type if the next buffer can be pulled
immediately or null
*/
- private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+ private Buffer.DataType getNextDataType(BufferAndBacklog
bufferAndBacklog) {
Review comment:
I added an implNote in the javadoc.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Objects;
+
+/**
+ * A deque-like data structure that supports prioritization of elements, such
they will be polled before any
+ * non-priority elements.
+ *
+ * <p>{@implNote The current implementation deliberately does not implement
the respective interface to minimize the maintenance
+ * effort. Furthermore, it's optimized for handling non-priority elements,
such that all operations for adding priority
+ * elements are much slower than the non-priority counter-parts.}
+ *
+ * <p>Note that all element tests are performed by identity.
+ *
+ * @param <T> the element type.
+ */
+@Internal
+public final class PrioritizedDeque<T> implements Iterable<T> {
+ private final Deque<T> deque = new ArrayDeque<>();
+ private int numPriorityElements;
+
+ /**
+ * Adds a priority element to this deque, such that it will be polled
after all existing priority elements but
+ * before any non-priority element.
+ *
+ * @param element the element to add
+ */
+ public void addPriorityElement(T element) {
+ // priority elements are rather rare and short-lived, so most
of there are none
+ if (numPriorityElements == 0) {
+ deque.addFirst(element);
+ } else if (numPriorityElements == deque.size()) {
+ // no non-priority elements
+ deque.add(element);
+ } else {
+ // remove all priority elements
+ final ArrayDeque<T> priorPriority = new
ArrayDeque<>(numPriorityElements);
+ for (int index = 0; index < numPriorityElements;
index++) {
+ priorPriority.addFirst(deque.poll());
+ }
+ deque.addFirst(element);
+ // readd them before the newly added element
+ for (final T priorityEvent : priorPriority) {
+ deque.addFirst(priorityEvent);
+ }
+ }
+ numPriorityElements++;
+ }
+
+ /**
+ * Adds a non-priority element to this deque, which will be polled last.
+ *
+ * @param element the element to add
+ */
+ public void add(T element) {
+ deque.add(element);
+ }
+
+ /**
+ * Convenience method for adding an element with optional priority and
prior removal.
+ *
+ * @param element the element to add
+ * @param priority flag indicating if it's a priority or non-priority
element
+ * @param alreadyContained flag that hints that the element is already
in this deque, potentially as non-priority element.
+ */
+ public void add(T element, boolean priority, boolean alreadyContained) {
Review comment:
I'm moving it to the commit that starts using it.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -22,13 +22,10 @@
import org.apache.flink.annotation.VisibleForTesting;
Review comment:
I admit that it's an awkward cut.
However, it's only spilled in one place as the `BufferReceivedListener`
methods are effectively not called in the previous commits anymore. I will make
a later pass to see that all tests pass though.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -314,6 +315,21 @@ protected StreamTask(
}
this.channelIOExecutor = Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("channel-state-unspilling"));
+
+ injectChannelStateWriterIntoChannels();
+ }
+
+ private void injectChannelStateWriterIntoChannels() {
+ final Environment env = getEnvironment();
+ final ChannelStateWriter channelStateWriter =
subtaskCheckpointCoordinator.getChannelStateWriter();
+ for (final InputGate gate : env.getAllInputGates()) {
+ gate.setChannelStateWriter(channelStateWriter);
+ }
+ for (ResultPartitionWriter writer : env.getAllWriters()) {
+ if (writer instanceof ChannelStateHolder) {
+ ((ChannelStateHolder)
writer).setChannelStateWriter(channelStateWriter);
+ }
+ }
Review comment:
Happy for any other suggestion. I think the cast itself is in line with
recent changes done by Stephan.
The proper solution would be to inject in ctor but that will not happen
until we merge `runtime` and `streaming`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -232,19 +232,16 @@ public int getInputIndex() {
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
for (int channelIndex = 0; channelIndex <
recordDeserializers.length; channelIndex++) {
- final InputChannel channel =
checkpointedInputGate.getChannel(channelIndex);
-
- // Assumption for retrieving buffers = one concurrent
checkpoint
RecordDeserializer<?> deserializer =
recordDeserializers[channelIndex];
if (deserializer != null) {
+ final InputChannel channel =
checkpointedInputGate.getChannel(channelIndex);
+
channelStateWriter.addInputData(
checkpointId,
channel.getChannelInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
deserializer.getUnconsumedBuffer());
}
-
-
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex,
channelStateWriter);
Review comment:
We discussed offline, but let me summarize the main point. When spilling
in main thread, buffers would be spilled rather late as you pointed out
initially.
One solution is to trigger the channel spilling in task thread as soon as
possible and then resume spilling from netty until barrier arrives. However,
that results in a complex threading model with lots of race condition as we
found out in the version in master.
Another solution is to spill in task thread and use any poll to discover new
buffers and spill them. It's slightly slower and also requires lots of internal
knowledge at the `Unaligner` about the channels to work well (mark all spilled
channels). It's probably also suboptimal as new buffers are usually enqueued
right after the head is polled, so one buffer is enqueued in the channel but
not persisted until the new head is polled.
The proposed solution is to spill in netty thread entirely. That's the
fastest possible solution with an comparably easy threading model. Downside is
the added complexity on channel-side, but the general idea is that upstream and
downstream side of a channel is now self-contained.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer,
boolean finish, boolean inser
private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean
insertAsHead) {
assert Thread.holdsLock(buffers);
- if (insertAsHead) {
- checkState(inflightBufferSnapshot.isEmpty(),
"Supporting only one concurrent checkpoint in unaligned " +
- "checkpoints");
+ if (!insertAsHead) {
+ buffers.add(bufferConsumer);
+ return;
+ }
+ checkState(inflightBufferSnapshot.isEmpty(), "Supporting only
one concurrent checkpoint in unaligned " +
+ "checkpoints");
- // Meanwhile prepare the collection of in-flight
buffers which would be fetched in the next step later.
- for (BufferConsumer buffer : buffers) {
- try (BufferConsumer bc = buffer.copy()) {
- if (bc.isBuffer()) {
+ final int pos = buffers.getNumPriorityElements();
+ buffers.addPriorityElement(bufferConsumer);
+
+ boolean unalignedCheckpoint =
isUnalignedCheckpoint(bufferConsumer);
+ if (unalignedCheckpoint) {
+ final Iterator<BufferConsumer> iterator =
buffers.iterator();
+ Iterators.advance(iterator, pos + 1);
+ while (iterator.hasNext()) {
+ BufferConsumer buffer = iterator.next();
+
+ if (buffer.isBuffer()) {
+ try (BufferConsumer bc = buffer.copy())
{
inflightBufferSnapshot.add(bc.build());
}
}
}
+ }
+ }
- buffers.addFirst(bufferConsumer);
- } else {
- buffers.add(bufferConsumer);
+ private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) {
+ boolean unalignedCheckpoint;
+ try (BufferConsumer bc = bufferConsumer.copy()) {
+ Buffer buffer = bc.build();
+ try {
+ final AbstractEvent event =
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+ unalignedCheckpoint = event instanceof
CheckpointBarrier;
+ } catch (IOException e) {
+ throw new IllegalStateException("Should always
be able to deserialize in-memory event", e);
+ } finally {
+ buffer.recycleBuffer();
+ }
}
+ return unalignedCheckpoint;
Review comment:
Moved this method to the commit that spills immediately. We need it in
that method to retrieve the checkpoint id to spill correctly.
Deserialization is only necessary for priority events, which are very rare
and rather cheap (30 bytes). I'd argue that adding a new call chain just to
optimize it is not warranted.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -167,23 +171,46 @@ private boolean add(BufferConsumer bufferConsumer,
boolean finish, boolean inser
private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean
insertAsHead) {
assert Thread.holdsLock(buffers);
- if (insertAsHead) {
- checkState(inflightBufferSnapshot.isEmpty(),
"Supporting only one concurrent checkpoint in unaligned " +
- "checkpoints");
+ if (!insertAsHead) {
+ buffers.add(bufferConsumer);
+ return;
+ }
+ checkState(inflightBufferSnapshot.isEmpty(), "Supporting only
one concurrent checkpoint in unaligned " +
+ "checkpoints");
- // Meanwhile prepare the collection of in-flight
buffers which would be fetched in the next step later.
Review comment:
Moved the removal to the commit that spills immediately.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -173,50 +181,51 @@ private boolean addBuffer(BufferConsumer bufferConsumer) {
buffers.add(bufferConsumer);
return false;
}
- checkState(inflightBufferSnapshot.isEmpty(), "Supporting only
one concurrent checkpoint in unaligned " +
- "checkpoints");
final int pos = buffers.getNumPriorityElements();
buffers.addPriorityElement(bufferConsumer);
- boolean unalignedCheckpoint =
isUnalignedCheckpoint(bufferConsumer);
- if (unalignedCheckpoint) {
+ CheckpointBarrier barrier =
parseCheckpointBarrier(bufferConsumer);
+ if (barrier != null) {
+ checkState(
+
barrier.getCheckpointOptions().isUnalignedCheckpoint(),
+ "Only unaligned checkpoints should be priority
events");
final Iterator<BufferConsumer> iterator =
buffers.iterator();
Iterators.advance(iterator, pos + 1);
+ List<Buffer> inflightBuffers = new ArrayList<>();
while (iterator.hasNext()) {
BufferConsumer buffer = iterator.next();
if (buffer.isBuffer()) {
try (BufferConsumer bc = buffer.copy())
{
-
inflightBufferSnapshot.add(bc.build());
+ inflightBuffers.add(bc.build());
}
}
}
+ channelStateWriter.addOutputData(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ inflightBuffers.toArray(new Buffer[0]));
}
return pos == 0;
}
- private boolean isUnalignedCheckpoint(BufferConsumer bufferConsumer) {
- boolean unalignedCheckpoint;
+ @Nullable
+ private CheckpointBarrier parseCheckpointBarrier(BufferConsumer
bufferConsumer) {
Review comment:
Hm I have a hard time coming up with a better code structure. I could
add checkpoint parsing and the `if` in `addBuffer` already in the first commit
`[FLINK-19026][network] Adding PrioritizedDeque and use it in
PipelinedSubpartition.`. Then this diff would be only about persisting itself.
But I was convinced that you would be confused why we need to parse the
barrier at that commit.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
##########
@@ -89,6 +92,14 @@
*/
private final int[] inputGateChannelIndexOffsets;
+ /**
+ * The channel from which is currently polled, which allows
interleaving of
+ * {@link #queueInputGate(IndexedInputGate, boolean)} and {@link
#pollNext()} (FLINK-12510 (Deadlock when reading from InputGates)).
+ */
+ @GuardedBy("inputGatesWithData")
+ @Nullable
+ private IndexedInputGate currentInputGate;
+
Review comment:
Removed thanks to alternative fix of FLINK-12510 (see previous commit
now).
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
##########
@@ -369,9 +369,7 @@ public void testMissingCancellationBarriers() throws
Exception {
inputGate = createBarrierBuffer(2, sequence, validator);
Review comment:
> A side-effect of this commit is that all events are handed over from
CheckpointedInputGate to StreamTaskNetworkInput and break up the poll loop.
However, since events are rare, it should have no visible impact on the
throughput.
The changes to the tests are now handling the additionally emitted events.
Imho tests are easier to read now (no magically disappearing buffers in the
sequence).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -124,23 +127,22 @@ public boolean isAvailable() {
}
/**
- * Check whether this reader is available or not (internal use, in sync
with
- * {@link #isAvailable()}, but slightly faster).
+ * Returns the {@link
org.apache.flink.runtime.io.network.buffer.Buffer.DataType} of the next buffer
in line.
*
- * <p>Returns true only if the next buffer is an event or the reader
has both available
+ * <p>Returns the next data type only if the next buffer is an event or
the reader has both available
* credits and buffers.
*
* @param bufferAndBacklog
* current buffer and backlog including information about
the next buffer
+ * @return the next data type if the next buffer can be pulled
immediately or null
*/
- private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+ private Buffer.DataType getNextDataType(BufferAndBacklog
bufferAndBacklog) {
// BEWARE: this must be in sync with #isAvailable()!
Review comment:
I expanded the comment (now javadoc) to clearly state the contract.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]