[FLINK-7468][network] Implement sender backlog logic for credit-based THis closes #4559.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/718a2ba0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/718a2ba0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/718a2ba0 Branch: refs/heads/master Commit: 718a2ba0bb1867d7053d813eb76cf6796b35d86f Parents: 544c970 Author: Zhijiang <wangzhijiang...@aliyun.com> Authored: Thu Aug 17 19:38:45 2017 +0800 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Jan 9 16:11:17 2018 +0100 ---------------------------------------------------------------------- .../netty/SequenceNumberingViewReader.java | 6 +- .../partition/PipelinedSubpartition.java | 54 ++++++++++- .../partition/PipelinedSubpartitionView.java | 6 +- .../network/partition/ResultSubpartition.java | 37 ++++++++ .../partition/ResultSubpartitionView.java | 14 +-- .../partition/SpillableSubpartition.java | 47 +++++++++- .../partition/SpillableSubpartitionView.java | 8 +- .../partition/SpilledSubpartitionView.java | 12 ++- .../partition/consumer/InputChannel.java | 12 ++- .../partition/consumer/LocalInputChannel.java | 8 +- .../partition/consumer/RemoteInputChannel.java | 2 +- .../netty/CancelPartitionRequestTest.java | 8 +- .../partition/PipelinedSubpartitionTest.java | 23 ++++- .../partition/SpillableSubpartitionTest.java | 96 ++++++++++++++------ .../partition/SpilledSubpartitionViewTest.java | 6 +- .../network/partition/SubpartitionTestBase.java | 20 +++- .../IteratorWrappingTestSingleInputGate.java | 4 +- .../partition/consumer/SingleInputGateTest.java | 3 +- .../partition/consumer/TestInputChannel.java | 6 +- .../network/util/TestSubpartitionConsumer.java | 14 +-- .../consumer/StreamTestSingleInputGate.java | 6 +- 21 files changed, 309 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index 6d95ca5..fcbfb21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.io.network.netty; -import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; @@ -86,13 +86,13 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener { } public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { - Buffer next = subpartitionView.getNextBuffer(); + BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { long remaining = numBuffersAvailable.decrementAndGet(); sequenceNumber++; if (remaining >= 0) { - return new BufferAndAvailability(next, remaining > 0); + return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog()); } else { throw new IllegalStateException("no buffer available"); } http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index c1d6f13..9f5a40c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -25,6 +26,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayDeque; @@ -52,6 +55,10 @@ class PipelinedSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition. */ + @GuardedBy("buffers") + private int buffersInBacklog; + // ------------------------------------------------------------------------ PipelinedSubpartition(int index, ResultPartition parent) { @@ -75,6 +82,7 @@ class PipelinedSubpartition extends ResultSubpartition { buffers.add(buffer); reader = readView; updateStatistics(buffer); + increaseBuffersInBacklog(buffer); } // Notify the listener outside of the synchronized block @@ -144,9 +152,17 @@ class PipelinedSubpartition extends ResultSubpartition { } } - Buffer pollBuffer() { + @Nullable + BufferAndBacklog pollBuffer() { synchronized (buffers) { - return buffers.pollFirst(); + Buffer buffer = buffers.pollFirst(); + decreaseBuffersInBacklog(buffer); + + if (buffer != null) { + return new BufferAndBacklog(buffer, buffersInBacklog); + } else { + return null; + } } } @@ -163,6 +179,36 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override + @VisibleForTesting + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + /** + * Decreases the number of non-event buffers by one after fetching a non-event + * buffer from this subpartition. + */ + private void decreaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + } + + /** + * Increases the number of non-event buffers by one after adding a non-event + * buffer into this subpartition. + */ + private void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } + + @Override public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { final int queueSize; @@ -206,8 +252,8 @@ class PipelinedSubpartition extends ResultSubpartition { } return String.format( - "PipelinedSubpartition [number of buffers: %d (%d bytes), finished? %s, read view? %s]", - numBuffers, numBytes, finished, hasReadView); + "PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]", + numBuffers, numBytes, buffersInBacklog, finished, hasReadView); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index fda2135..2aafd3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -18,8 +18,9 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,8 +45,9 @@ class PipelinedSubpartitionView implements ResultSubpartitionView { this.isReleased = new AtomicBoolean(); } + @Nullable @Override - public Buffer getNextBuffer() { + public BufferAndBacklog getNextBuffer() { return parent.pollBuffer(); } http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index e73082a..e42bd90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -18,10 +18,13 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.buffer.Buffer; import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A single subpartition of a {@link ResultPartition} instance. */ @@ -95,10 +98,44 @@ public abstract class ResultSubpartition { abstract public boolean isReleased(); /** + * Gets the number of non-event buffers in this subpartition. + * + * <p><strong>Beware:</strong> This method should only be used in tests in non-concurrent access + * scenarios since it does not make any concurrency guarantees. + */ + @VisibleForTesting + abstract public int getBuffersInBacklog(); + + /** * Makes a best effort to get the current size of the queue. * This method must not acquire locks or interfere with the task and network threads in * any way. */ abstract public int unsynchronizedGetNumberOfQueuedBuffers(); + // ------------------------------------------------------------------------ + + /** + * A combination of a {@link Buffer} and the backlog length indicating + * how many non-event buffers are available in the subpartition. + */ + public static final class BufferAndBacklog { + + private final Buffer buffer; + private final int buffersInBacklog; + + public BufferAndBacklog(Buffer buffer, int buffersInBacklog) { + this.buffer = checkNotNull(buffer); + this.buffersInBacklog = buffersInBacklog; + } + + public Buffer buffer() { + return buffer; + } + + public int buffersInBacklog() { + return buffersInBacklog; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index 98be90f..fb31592 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import javax.annotation.Nullable; import java.io.IOException; /** @@ -29,16 +31,17 @@ public interface ResultSubpartitionView { /** * Returns the next {@link Buffer} instance of this queue iterator. - * <p> - * If there is currently no instance available, it will return <code>null</code>. + * + * <p>If there is currently no instance available, it will return <code>null</code>. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. - * <p> - * <strong>Important</strong>: The consumer has to make sure that each + * + * <p><strong>Important</strong>: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; + @Nullable + BufferAndBacklog getNextBuffer() throws IOException, InterruptedException; void notifyBuffersAvailable(long buffers) throws IOException; @@ -49,5 +52,4 @@ public interface ResultSubpartitionView { boolean isReleased(); Throwable getFailureCause(); - } http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 4a8e165..e977f60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -28,6 +29,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayDeque; @@ -77,6 +79,10 @@ class SpillableSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private int buffersInBacklog; + /** The read view to consume this subpartition. */ private ResultSubpartitionView readView; @@ -102,6 +108,7 @@ class SpillableSubpartition extends ResultSubpartition { // the read views. If you ever remove this line here, // make sure to still count the number of buffers. updateStatistics(buffer); + increaseBuffersInBacklog(buffer); return true; } @@ -114,6 +121,7 @@ class SpillableSubpartition extends ResultSubpartition { synchronized (buffers) { // See the note above, but only do this if the buffer was correctly added! updateStatistics(buffer); + increaseBuffersInBacklog(buffer); } } finally { buffer.recycle(); @@ -247,6 +255,39 @@ class SpillableSubpartition extends ResultSubpartition { } @Override + @VisibleForTesting + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + /** + * Decreases the number of non-event buffers by one after fetching a non-event + * buffer from this subpartition (for access by the subpartition views). + * + * @return backlog after the operation + */ + public int decreaseBuffersInBacklog(Buffer buffer) { + synchronized (buffers) { + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + return buffersInBacklog; + } + } + + /** + * Increases the number of non-event buffers by one after adding a non-event + * buffer into this subpartition. + */ + private void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } + + @Override public int unsynchronizedGetNumberOfQueuedBuffers() { // since we do not synchronize, the size may actually be lower than 0! return Math.max(buffers.size(), 0); @@ -255,9 +296,9 @@ class SpillableSubpartition extends ResultSubpartition { @Override public String toString() { return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + - "finished? %s, read view? %s, spilled? %s]", - getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, - spillWriter != null); + "%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]", + getTotalNumberOfBuffers(), getTotalNumberOfBytes(), + buffersInBacklog, isFinished, readView != null, spillWriter != null); } } http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 6781902..2527273 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -18,12 +18,14 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -128,8 +130,9 @@ class SpillableSubpartitionView implements ResultSubpartitionView { } } + @Nullable @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException { synchronized (buffers) { if (isReleased.get()) { return null; @@ -141,7 +144,8 @@ class SpillableSubpartitionView implements ResultSubpartitionView { listener.notifyBuffersAvailable(1); } - return current; + int newBacklog = parent.decreaseBuffersInBacklog(current); + return new BufferAndBacklog(current, newBacklog); } } // else: spilled http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index fec0f2a..20e0406 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.util.event.NotificationListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; @@ -51,7 +53,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class); /** The subpartition this view belongs to. */ - private final ResultSubpartition parent; + private final SpillableSubpartition parent; /** Writer for spills. */ private final BufferFileWriter spillWriter; @@ -75,7 +77,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis private volatile boolean isSpillInProgress = true; SpilledSubpartitionView( - ResultSubpartition parent, + SpillableSubpartition parent, int memorySegmentSize, BufferFileWriter spillWriter, long numberOfSpilledBuffers, @@ -113,8 +115,9 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers); } + @Nullable @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException { if (fileReader.hasReachedEndOfFile() || isSpillInProgress) { return null; } @@ -124,7 +127,8 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis Buffer buffer = bufferPool.requestBufferBlocking(); fileReader.readInto(buffer); - return buffer; + int newBacklog = parent.decreaseBuffersInBacklog(buffer); + return new BufferAndBacklog(buffer, newBacklog); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 68b05d4..7b7edf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -242,16 +242,20 @@ public abstract class InputChannel { // ------------------------------------------------------------------------ /** - * A combination of a {@link Buffer} and a flag indicating availability of further buffers. + * A combination of a {@link Buffer} and a flag indicating availability of further buffers, + * and the backlog length indicating how many non-event buffers are available in the + * subpartition. */ public static final class BufferAndAvailability { private final Buffer buffer; private final boolean moreAvailable; + private final int buffersInBacklog; - public BufferAndAvailability(Buffer buffer, boolean moreAvailable) { + public BufferAndAvailability(Buffer buffer, boolean moreAvailable, int buffersInBacklog) { this.buffer = checkNotNull(buffer); this.moreAvailable = moreAvailable; + this.buffersInBacklog = buffersInBacklog; } public Buffer buffer() { @@ -261,5 +265,9 @@ public abstract class InputChannel { public boolean moreAvailable() { return moreAvailable; } + + public int buffersInBacklog() { + return buffersInBacklog; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 71b3653..8505666 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -21,12 +21,12 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ProducerFailedException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.slf4j.Logger; @@ -179,7 +179,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit subpartitionView = checkAndWaitForSubpartitionView(); } - Buffer next = subpartitionView.getNextBuffer(); + BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next == null) { if (subpartitionView.isReleased()) { @@ -195,8 +195,8 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit long remaining = numBuffersAvailable.decrementAndGet(); if (remaining >= 0) { - numBytesIn.inc(next.getSizeUnsafe()); - return new BufferAndAvailability(next, remaining > 0); + numBytesIn.inc(next.buffer().getSizeUnsafe()); + return new BufferAndAvailability(next.buffer(), remaining > 0, next.buffersInBacklog()); } else if (subpartitionView.isReleased()) { throw new ProducerFailedException(subpartitionView.getFailureCause()); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 7605075..397f407 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -196,7 +196,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, } numBytesIn.inc(next.getSizeUnsafe()); - return new BufferAndAvailability(next, remaining > 0); + return new BufferAndAvailability(next, remaining > 0, getSenderBacklog()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index c9f063b..4acdb36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; @@ -36,6 +36,7 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -185,9 +186,10 @@ public class CancelPartitionRequestTest { this.sync = checkNotNull(sync); } + @Nullable @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { - return bufferProvider.requestBufferBlocking(); + public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException { + return new BufferAndBacklog(bufferProvider.requestBufferBlocking(), 0); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 6d36aa6..9edba35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; @@ -106,18 +107,38 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(1, subpartition.getTotalNumberOfBuffers()); assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + // ...should have resulted in a notification verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result - assertNotNull(view.getNextBuffer()); + BufferAndBacklog read = view.getNextBuffer(); + assertNotNull(read); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); // Add data to the queue... subpartition.add(createBuffer()); + assertEquals(2, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + + // Add event to the queue... + Buffer event = createBuffer(); + event.tagAsEvent(); + subpartition.add(event); + + assertEquals(3, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + verify(listener, times(3)).notifyBuffersAvailable(eq(1L)); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index c50b361..57aa82f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; @@ -203,6 +204,10 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(3, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); // now the buffer may be freed, depending on the timing of the write operation @@ -211,44 +216,65 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(3, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); // + one EndOfPartitionEvent assertEquals(4, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener()); SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); - Buffer read = reader.getNextBuffer(); + BufferAndBacklog read = reader.getNextBuffer(); assertNotNull(read); + assertEquals(2, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertEquals(1, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); - assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertEquals(EndOfPartitionEvent.class, + EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // finally check that the buffer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs @@ -277,6 +303,10 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(4, partition.getTotalNumberOfBuffers()); assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); + AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener); @@ -284,9 +314,12 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(1, listener.getNumNotifiedBuffers()); assertFalse(buffer.isRecycled()); - Buffer read = reader.getNextBuffer(); - assertSame(buffer, read); - read.recycle(); + BufferAndBacklog read = reader.getNextBuffer(); + assertNotNull(read); + assertSame(buffer, read.buffer()); + assertEquals(2, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + read.buffer().recycle(); assertEquals(2, listener.getNumNotifiedBuffers()); assertFalse(buffer.isRecycled()); @@ -295,31 +328,40 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertFalse(buffer.isRecycled()); // still one in the reader! // still same statistics: assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(2, partition.getBuffersInBacklog()); assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); listener.awaitNotifications(4, 30_000); assertEquals(4, listener.getNumNotifiedBuffers()); read = reader.getNextBuffer(); - assertSame(buffer, read); - read.recycle(); + assertNotNull(read); + assertEquals(1, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertSame(buffer, read.buffer()); + read.buffer().recycle(); // now the buffer may be freed, depending on the timing of the write operation // -> let's do this check at the end of the test (to save some time) read = reader.getNextBuffer(); assertNotNull(read); - assertNotSame(buffer, read); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertNotSame(buffer, read.buffer()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); - assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); - assertFalse(read.isRecycled()); - read.recycle(); - assertTrue(read.isRecycled()); + assertEquals(0, partition.getBuffersInBacklog()); + assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); + assertEquals(EndOfPartitionEvent.class, + EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.buffer().isRecycled()); + read.buffer().recycle(); + assertTrue(read.buffer().isRecycled()); // finally check that the buffer has been freed after a successful (or failed) write final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java index 69d19fc..89a4e03 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java @@ -73,7 +73,7 @@ public class SpilledSubpartitionViewTest { false, new TestConsumerCallback.RecyclingCallback()); SpilledSubpartitionView view = new SpilledSubpartitionView( - mock(ResultSubpartition.class), + mock(SpillableSubpartition.class), viewBufferPool.getMemorySegmentSize(), writer, numberOfBuffersToWrite + 1, // +1 for end-of-partition @@ -99,7 +99,7 @@ public class SpilledSubpartitionViewTest { false, new TestConsumerCallback.RecyclingCallback()); SpilledSubpartitionView view = new SpilledSubpartitionView( - mock(ResultSubpartition.class), + mock(SpillableSubpartition.class), 32 * 1024, writer, numberOfBuffersToWrite + 1, @@ -140,7 +140,7 @@ public class SpilledSubpartitionViewTest { BufferProvider inputBuffers = new TestPooledBufferProvider(2); - ResultSubpartition parent = mock(ResultSubpartition.class); + SpillableSubpartition parent = mock(SpillableSubpartition.class); // Wait for writers to finish for (BufferFileWriter writer : writers) { http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index d084f62..2d56258 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.util.TestLogger; @@ -51,8 +53,15 @@ public abstract class SubpartitionTestBase extends TestLogger { assertEquals(1, subpartition.getTotalNumberOfBuffers()); assertEquals(4, subpartition.getTotalNumberOfBytes()); - assertFalse(subpartition.add(mock(Buffer.class))); assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + + assertFalse(subpartition.add(buffer)); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(4, subpartition.getTotalNumberOfBytes()); } finally { if (subpartition != null) { @@ -70,8 +79,15 @@ public abstract class SubpartitionTestBase extends TestLogger { assertEquals(0, subpartition.getTotalNumberOfBuffers()); assertEquals(0, subpartition.getTotalNumberOfBytes()); - assertFalse(subpartition.add(mock(Buffer.class))); assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + + assertFalse(subpartition.add(buffer)); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(0, subpartition.getTotalNumberOfBytes()); } finally { if (subpartition != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index 16285b7..5fe835a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -75,11 +75,11 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e hasData = inputIterator.next(reuse) != null; // Call getCurrentBuffer to ensure size is set - return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true); + return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true, 0); } else { when(inputChannel.getInputChannel().isReleased()).thenReturn(true); - return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); + return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); } } }; http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index da649cd..59fa7a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -124,7 +125,7 @@ public class SingleInputGateTest { final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class); when(iterator.getNextBuffer()).thenReturn( - new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class))); + new BufferAndBacklog(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)), 0)); final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager.createSubpartitionView( http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java index a6597a2..43ac7a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java @@ -55,9 +55,9 @@ public class TestInputChannel { public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException { if (stubbing == null) { - stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true)); + stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true, 0)); } else { - stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true)); + stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true, 0)); } return this; @@ -77,7 +77,7 @@ public class TestInputChannel { // Return true after finishing when(mock.isReleased()).thenReturn(true); - return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); + return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); } }; http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java index 676a304..37137ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.io.network.util; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import java.util.Random; @@ -92,24 +92,24 @@ public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvaila } } - final Buffer buffer = subpartitionView.getNextBuffer(); + final BufferAndBacklog bufferAndBacklog = subpartitionView.getNextBuffer(); if (isSlowConsumer) { Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1)); } - if (buffer != null) { + if (bufferAndBacklog != null) { numBuffersAvailable.decrementAndGet(); - if (buffer.isBuffer()) { - callback.onBuffer(buffer); + if (bufferAndBacklog.buffer().isBuffer()) { + callback.onBuffer(bufferAndBacklog.buffer()); } else { - final AbstractEvent event = EventSerializer.fromBuffer(buffer, + final AbstractEvent event = EventSerializer.fromBuffer(bufferAndBacklog.buffer(), getClass().getClassLoader()); callback.onEvent(event); - buffer.recycle(); + bufferAndBacklog.buffer().recycle(); if (event.getClass() == EndOfPartitionEvent.class) { subpartitionView.notifySubpartitionConsumed(); http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index f19d59d..11d8f11 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -98,7 +98,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { if (input != null && input.isStreamEnd()) { when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn( true); - return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); + return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false, 0); } else if (input != null && input.isStreamRecord()) { Object inputElement = input.getStreamRecord(); @@ -107,10 +107,10 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { recordSerializer.addRecord(delegate); // Call getCurrentBuffer to ensure size is set - return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false); + return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false, 0); } else if (input != null && input.isEvent()) { AbstractEvent event = input.getEvent(); - return new BufferAndAvailability(EventSerializer.toBuffer(event), false); + return new BufferAndAvailability(EventSerializer.toBuffer(event), false, 0); } else { synchronized (inputQueues[channelIndex]) { inputQueues[channelIndex].wait();