[FLINK-7468][network] Implement sender backlog logic for credit-based

THis closes #4559.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/718a2ba0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/718a2ba0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/718a2ba0

Branch: refs/heads/master
Commit: 718a2ba0bb1867d7053d813eb76cf6796b35d86f
Parents: 544c970
Author: Zhijiang <wangzhijiang...@aliyun.com>
Authored: Thu Aug 17 19:38:45 2017 +0800
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Jan 9 16:11:17 2018 +0100

----------------------------------------------------------------------
 .../netty/SequenceNumberingViewReader.java      |  6 +-
 .../partition/PipelinedSubpartition.java        | 54 ++++++++++-
 .../partition/PipelinedSubpartitionView.java    |  6 +-
 .../network/partition/ResultSubpartition.java   | 37 ++++++++
 .../partition/ResultSubpartitionView.java       | 14 +--
 .../partition/SpillableSubpartition.java        | 47 +++++++++-
 .../partition/SpillableSubpartitionView.java    |  8 +-
 .../partition/SpilledSubpartitionView.java      | 12 ++-
 .../partition/consumer/InputChannel.java        | 12 ++-
 .../partition/consumer/LocalInputChannel.java   |  8 +-
 .../partition/consumer/RemoteInputChannel.java  |  2 +-
 .../netty/CancelPartitionRequestTest.java       |  8 +-
 .../partition/PipelinedSubpartitionTest.java    | 23 ++++-
 .../partition/SpillableSubpartitionTest.java    | 96 ++++++++++++++------
 .../partition/SpilledSubpartitionViewTest.java  |  6 +-
 .../network/partition/SubpartitionTestBase.java | 20 +++-
 .../IteratorWrappingTestSingleInputGate.java    |  4 +-
 .../partition/consumer/SingleInputGateTest.java |  3 +-
 .../partition/consumer/TestInputChannel.java    |  6 +-
 .../network/util/TestSubpartitionConsumer.java  | 14 +--
 .../consumer/StreamTestSingleInputGate.java     |  6 +-
 21 files changed, 309 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 6d95ca5..fcbfb21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -86,13 +86,13 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener {
        }
 
        public BufferAndAvailability getNextBuffer() throws IOException, 
InterruptedException {
-               Buffer next = subpartitionView.getNextBuffer();
+               BufferAndBacklog next = subpartitionView.getNextBuffer();
                if (next != null) {
                        long remaining = numBuffersAvailable.decrementAndGet();
                        sequenceNumber++;
 
                        if (remaining >= 0) {
-                               return new BufferAndAvailability(next, 
remaining > 0);
+                               return new BufferAndAvailability(next.buffer(), 
remaining > 0, next.buffersInBacklog());
                        } else {
                                throw new IllegalStateException("no buffer 
available");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c1d6f13..9f5a40c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -25,6 +26,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
@@ -52,6 +55,10 @@ class PipelinedSubpartition extends ResultSubpartition {
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
 
+       /** The number of non-event buffers currently in this subpartition. */
+       @GuardedBy("buffers")
+       private int buffersInBacklog;
+
        // 
------------------------------------------------------------------------
 
        PipelinedSubpartition(int index, ResultPartition parent) {
@@ -75,6 +82,7 @@ class PipelinedSubpartition extends ResultSubpartition {
                        buffers.add(buffer);
                        reader = readView;
                        updateStatistics(buffer);
+                       increaseBuffersInBacklog(buffer);
                }
 
                // Notify the listener outside of the synchronized block
@@ -144,9 +152,17 @@ class PipelinedSubpartition extends ResultSubpartition {
                }
        }
 
-       Buffer pollBuffer() {
+       @Nullable
+       BufferAndBacklog pollBuffer() {
                synchronized (buffers) {
-                       return buffers.pollFirst();
+                       Buffer buffer = buffers.pollFirst();
+                       decreaseBuffersInBacklog(buffer);
+
+                       if (buffer != null) {
+                               return new BufferAndBacklog(buffer, 
buffersInBacklog);
+                       } else {
+                               return null;
+                       }
                }
        }
 
@@ -163,6 +179,36 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
+       @VisibleForTesting
+       public int getBuffersInBacklog() {
+               return buffersInBacklog;
+       }
+
+       /**
+        * Decreases the number of non-event buffers by one after fetching a 
non-event
+        * buffer from this subpartition.
+        */
+       private void decreaseBuffersInBacklog(Buffer buffer) {
+               assert Thread.holdsLock(buffers);
+
+               if (buffer != null && buffer.isBuffer()) {
+                       buffersInBacklog--;
+               }
+       }
+
+       /**
+        * Increases the number of non-event buffers by one after adding a 
non-event
+        * buffer into this subpartition.
+        */
+       private void increaseBuffersInBacklog(Buffer buffer) {
+               assert Thread.holdsLock(buffers);
+
+               if (buffer != null && buffer.isBuffer()) {
+                       buffersInBacklog++;
+               }
+       }
+
+       @Override
        public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException {
                final int queueSize;
 
@@ -206,8 +252,8 @@ class PipelinedSubpartition extends ResultSubpartition {
                }
 
                return String.format(
-                               "PipelinedSubpartition [number of buffers: %d 
(%d bytes), finished? %s, read view? %s]",
-                               numBuffers, numBytes, finished, hasReadView);
+                       "PipelinedSubpartition [number of buffers: %d (%d 
bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
+                       numBuffers, numBytes, buffersInBacklog, finished, 
hasReadView);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index fda2135..2aafd3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -44,8 +45,9 @@ class PipelinedSubpartitionView implements 
ResultSubpartitionView {
                this.isReleased = new AtomicBoolean();
        }
 
+       @Nullable
        @Override
-       public Buffer getNextBuffer() {
+       public BufferAndBacklog getNextBuffer() {
                return parent.pollBuffer();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index e73082a..e42bd90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A single subpartition of a {@link ResultPartition} instance.
  */
@@ -95,10 +98,44 @@ public abstract class ResultSubpartition {
        abstract public boolean isReleased();
 
        /**
+        * Gets the number of non-event buffers in this subpartition.
+        *
+        * <p><strong>Beware:</strong> This method should only be used in tests 
in non-concurrent access
+        * scenarios since it does not make any concurrency guarantees.
+        */
+       @VisibleForTesting
+       abstract public int getBuffersInBacklog();
+
+       /**
         * Makes a best effort to get the current size of the queue.
         * This method must not acquire locks or interfere with the task and 
network threads in
         * any way.
         */
        abstract public int unsynchronizedGetNumberOfQueuedBuffers();
 
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A combination of a {@link Buffer} and the backlog length indicating
+        * how many non-event buffers are available in the subpartition.
+        */
+       public static final class BufferAndBacklog {
+
+               private final Buffer buffer;
+               private final int buffersInBacklog;
+
+               public BufferAndBacklog(Buffer buffer, int buffersInBacklog) {
+                       this.buffer = checkNotNull(buffer);
+                       this.buffersInBacklog = buffersInBacklog;
+               }
+
+               public Buffer buffer() {
+                       return buffer;
+               }
+
+               public int buffersInBacklog() {
+                       return buffersInBacklog;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 98be90f..fb31592 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -29,16 +31,17 @@ public interface ResultSubpartitionView {
 
        /**
         * Returns the next {@link Buffer} instance of this queue iterator.
-        * <p>
-        * If there is currently no instance available, it will return 
<code>null</code>.
+        *
+        * <p>If there is currently no instance available, it will return 
<code>null</code>.
         * This might happen for example when a pipelined queue producer is 
slower
         * than the consumer or a spilled queue needs to read in more data.
-        * <p>
-        * <strong>Important</strong>: The consumer has to make sure that each
+        *
+        * <p><strong>Important</strong>: The consumer has to make sure that 
each
         * buffer instance will eventually be recycled with {@link 
Buffer#recycle()}
         * after it has been consumed.
         */
-       Buffer getNextBuffer() throws IOException, InterruptedException;
+       @Nullable
+       BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException;
 
        void notifyBuffersAvailable(long buffers) throws IOException;
 
@@ -49,5 +52,4 @@ public interface ResultSubpartitionView {
        boolean isReleased();
 
        Throwable getFailureCause();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4a8e165..e977f60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
@@ -77,6 +79,10 @@ class SpillableSubpartition extends ResultSubpartition {
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
 
+       /** The number of non-event buffers currently in this subpartition */
+       @GuardedBy("buffers")
+       private int buffersInBacklog;
+
        /** The read view to consume this subpartition. */
        private ResultSubpartitionView readView;
 
@@ -102,6 +108,7 @@ class SpillableSubpartition extends ResultSubpartition {
                                // the read views. If you ever remove this line 
here,
                                // make sure to still count the number of 
buffers.
                                updateStatistics(buffer);
+                               increaseBuffersInBacklog(buffer);
 
                                return true;
                        }
@@ -114,6 +121,7 @@ class SpillableSubpartition extends ResultSubpartition {
                        synchronized (buffers) {
                                // See the note above, but only do this if the 
buffer was correctly added!
                                updateStatistics(buffer);
+                               increaseBuffersInBacklog(buffer);
                        }
                } finally {
                        buffer.recycle();
@@ -247,6 +255,39 @@ class SpillableSubpartition extends ResultSubpartition {
        }
 
        @Override
+       @VisibleForTesting
+       public int getBuffersInBacklog() {
+               return buffersInBacklog;
+       }
+
+       /**
+        * Decreases the number of non-event buffers by one after fetching a 
non-event
+        * buffer from this subpartition (for access by the subpartition views).
+        *
+        * @return backlog after the operation
+        */
+       public int decreaseBuffersInBacklog(Buffer buffer) {
+               synchronized (buffers) {
+                       if (buffer != null && buffer.isBuffer()) {
+                               buffersInBacklog--;
+                       }
+                       return buffersInBacklog;
+               }
+       }
+
+       /**
+        * Increases the number of non-event buffers by one after adding a 
non-event
+        * buffer into this subpartition.
+        */
+       private void increaseBuffersInBacklog(Buffer buffer) {
+               assert Thread.holdsLock(buffers);
+
+               if (buffer != null && buffer.isBuffer()) {
+                       buffersInBacklog++;
+               }
+       }
+
+       @Override
        public int unsynchronizedGetNumberOfQueuedBuffers() {
                // since we do not synchronize, the size may actually be lower 
than 0!
                return Math.max(buffers.size(), 0);
@@ -255,9 +296,9 @@ class SpillableSubpartition extends ResultSubpartition {
        @Override
        public String toString() {
                return String.format("SpillableSubpartition [%d number of 
buffers (%d bytes)," +
-                                               "finished? %s, read view? %s, 
spilled? %s]",
-                               getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(), isFinished, readView != null,
-                               spillWriter != null);
+                               "%d number of buffers in backlog, finished? %s, 
read view? %s, spilled? %s]",
+                       getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+                       buffersInBacklog, isFinished, readView != null, 
spillWriter != null);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 6781902..2527273 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -128,8 +130,9 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                }
        }
 
+       @Nullable
        @Override
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
+       public BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException {
                synchronized (buffers) {
                        if (isReleased.get()) {
                                return null;
@@ -141,7 +144,8 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                                        listener.notifyBuffersAvailable(1);
                                }
 
-                               return current;
+                               int newBacklog = 
parent.decreaseBuffersInBacklog(current);
+                               return new BufferAndBacklog(current, 
newBacklog);
                        }
                } // else: spilled
 

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index fec0f2a..20e0406 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
@@ -29,6 +30,7 @@ import 
org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
@@ -51,7 +53,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
        private static final Logger LOG = 
LoggerFactory.getLogger(SpilledSubpartitionView.class);
 
        /** The subpartition this view belongs to. */
-       private final ResultSubpartition parent;
+       private final SpillableSubpartition parent;
 
        /** Writer for spills. */
        private final BufferFileWriter spillWriter;
@@ -75,7 +77,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
        private volatile boolean isSpillInProgress = true;
 
        SpilledSubpartitionView(
-               ResultSubpartition parent,
+               SpillableSubpartition parent,
                int memorySegmentSize,
                BufferFileWriter spillWriter,
                long numberOfSpilledBuffers,
@@ -113,8 +115,9 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
                LOG.debug("Finished spilling. Notified about {} available 
buffers.", numberOfSpilledBuffers);
        }
 
+       @Nullable
        @Override
-       public Buffer getNextBuffer() throws IOException, InterruptedException {
+       public BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException {
                if (fileReader.hasReachedEndOfFile() || isSpillInProgress) {
                        return null;
                }
@@ -124,7 +127,8 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
                Buffer buffer = bufferPool.requestBufferBlocking();
                fileReader.readInto(buffer);
 
-               return buffer;
+               int newBacklog = parent.decreaseBuffersInBacklog(buffer);
+               return new BufferAndBacklog(buffer, newBacklog);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 68b05d4..7b7edf7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -242,16 +242,20 @@ public abstract class InputChannel {
        // 
------------------------------------------------------------------------
 
        /**
-        * A combination of a {@link Buffer} and a flag indicating availability 
of further buffers.
+        * A combination of a {@link Buffer} and a flag indicating availability 
of further buffers,
+        * and the backlog length indicating how many non-event buffers are 
available in the
+        * subpartition.
         */
        public static final class BufferAndAvailability {
 
                private final Buffer buffer;
                private final boolean moreAvailable;
+               private final int buffersInBacklog;
 
-               public BufferAndAvailability(Buffer buffer, boolean 
moreAvailable) {
+               public BufferAndAvailability(Buffer buffer, boolean 
moreAvailable, int buffersInBacklog) {
                        this.buffer = checkNotNull(buffer);
                        this.moreAvailable = moreAvailable;
+                       this.buffersInBacklog = buffersInBacklog;
                }
 
                public Buffer buffer() {
@@ -261,5 +265,9 @@ public abstract class InputChannel {
                public boolean moreAvailable() {
                        return moreAvailable;
                }
+
+               public int buffersInBacklog() {
+                       return buffersInBacklog;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 71b3653..8505666 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -21,12 +21,12 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.slf4j.Logger;
@@ -179,7 +179,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                        subpartitionView = checkAndWaitForSubpartitionView();
                }
 
-               Buffer next = subpartitionView.getNextBuffer();
+               BufferAndBacklog next = subpartitionView.getNextBuffer();
 
                if (next == null) {
                        if (subpartitionView.isReleased()) {
@@ -195,8 +195,8 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                long remaining = numBuffersAvailable.decrementAndGet();
 
                if (remaining >= 0) {
-                       numBytesIn.inc(next.getSizeUnsafe());
-                       return new BufferAndAvailability(next, remaining > 0);
+                       numBytesIn.inc(next.buffer().getSizeUnsafe());
+                       return new BufferAndAvailability(next.buffer(), 
remaining > 0, next.buffersInBacklog());
                } else if (subpartitionView.isReleased()) {
                        throw new 
ProducerFailedException(subpartitionView.getFailureCause());
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 7605075..397f407 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -196,7 +196,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                }
 
                numBytesIn.inc(next.getSizeUnsafe());
-               return new BufferAndAvailability(next, remaining > 0);
+               return new BufferAndAvailability(next, remaining > 0, 
getSenderBacklog());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index c9f063b..4acdb36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
@@ -36,6 +36,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -185,9 +186,10 @@ public class CancelPartitionRequestTest {
                        this.sync = checkNotNull(sync);
                }
 
+               @Nullable
                @Override
-               public Buffer getNextBuffer() throws IOException, 
InterruptedException {
-                       return bufferProvider.requestBufferBlocking();
+               public BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException {
+                       return new 
BufferAndBacklog(bufferProvider.requestBufferBlocking(), 0);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 6d36aa6..9edba35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
@@ -106,18 +107,38 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(1, subpartition.getTotalNumberOfBuffers());
                assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
 
+               assertEquals(1, subpartition.getTotalNumberOfBuffers());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+               assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+
                // ...should have resulted in a notification
                verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
                // ...and one available result
-               assertNotNull(view.getNextBuffer());
+               BufferAndBacklog read = view.getNextBuffer();
+               assertNotNull(read);
+               assertEquals(0, subpartition.getBuffersInBacklog());
+               assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertNull(view.getNextBuffer());
+               assertEquals(0, subpartition.getBuffersInBacklog());
 
                // Add data to the queue...
                subpartition.add(createBuffer());
+
                assertEquals(2, subpartition.getTotalNumberOfBuffers());
+               assertEquals(1, subpartition.getBuffersInBacklog());
                assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
                verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
+
+               // Add event to the queue...
+               Buffer event = createBuffer();
+               event.tagAsEvent();
+               subpartition.add(event);
+
+               assertEquals(3, subpartition.getTotalNumberOfBuffers());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+               assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
+               verify(listener, times(3)).notifyBuffersAvailable(eq(1L));
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index c50b361..57aa82f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
@@ -203,6 +204,10 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(3, partition.getTotalNumberOfBuffers());
                assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
 
+               assertEquals(3, partition.getTotalNumberOfBuffers());
+               assertEquals(3, partition.getBuffersInBacklog());
+               assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
                assertFalse(buffer.isRecycled());
                assertEquals(3, partition.releaseMemory());
                // now the buffer may be freed, depending on the timing of the 
write operation
@@ -211,44 +216,65 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(3, partition.getTotalNumberOfBuffers());
                assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
 
+               // now the buffer may be freed, depending on the timing of the 
write operation
+               // -> let's do this check at the end of the test (to save some 
time)
+               // still same statistics
+               assertEquals(3, partition.getTotalNumberOfBuffers());
+               assertEquals(3, partition.getBuffersInBacklog());
+               assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
                partition.finish();
                // + one EndOfPartitionEvent
                assertEquals(4, partition.getTotalNumberOfBuffers());
                assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
+               // + one EndOfPartitionEvent
+               assertEquals(4, partition.getTotalNumberOfBuffers());
+               assertEquals(3, partition.getBuffersInBacklog());
+               assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
+
                BufferAvailabilityListener listener = spy(new 
AwaitableBufferAvailablityListener());
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
 
                verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
-               Buffer read = reader.getNextBuffer();
+               BufferAndBacklog read = reader.getNextBuffer();
                assertNotNull(read);
+               assertEquals(2, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertNotSame(buffer, read);
-               assertFalse(read.isRecycled());
-               read.recycle();
-               assertTrue(read.isRecycled());
+               assertFalse(read.buffer().isRecycled());
+               read.buffer().recycle();
+               assertTrue(read.buffer().isRecycled());
 
                read = reader.getNextBuffer();
                assertNotNull(read);
+               assertEquals(1, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertNotSame(buffer, read);
-               assertFalse(read.isRecycled());
-               read.recycle();
-               assertTrue(read.isRecycled());
+               assertFalse(read.buffer().isRecycled());
+               read.buffer().recycle();
+               assertTrue(read.buffer().isRecycled());
 
                read = reader.getNextBuffer();
                assertNotNull(read);
+               assertEquals(0, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
                assertNotSame(buffer, read);
-               assertFalse(read.isRecycled());
-               read.recycle();
-               assertTrue(read.isRecycled());
+               assertFalse(read.buffer().isRecycled());
+               read.buffer().recycle();
+               assertTrue(read.buffer().isRecycled());
 
                // End of partition
                read = reader.getNextBuffer();
                assertNotNull(read);
-               assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
-               assertFalse(read.isRecycled());
-               read.recycle();
-               assertTrue(read.isRecycled());
+               assertEquals(0, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
+               assertEquals(EndOfPartitionEvent.class,
+                       EventSerializer.fromBuffer(read.buffer(), 
ClassLoader.getSystemClassLoader()).getClass());
+               assertFalse(read.buffer().isRecycled());
+               read.buffer().recycle();
+               assertTrue(read.buffer().isRecycled());
 
                // finally check that the buffer has been freed after a 
successful (or failed) write
                final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs
@@ -277,6 +303,10 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(4, partition.getTotalNumberOfBuffers());
                assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
+               assertEquals(4, partition.getTotalNumberOfBuffers());
+               assertEquals(3, partition.getBuffersInBacklog());
+               assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
+
                AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
                SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(listener);
 
@@ -284,9 +314,12 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(1, listener.getNumNotifiedBuffers());
                assertFalse(buffer.isRecycled());
 
-               Buffer read = reader.getNextBuffer();
-               assertSame(buffer, read);
-               read.recycle();
+               BufferAndBacklog read = reader.getNextBuffer();
+               assertNotNull(read);
+               assertSame(buffer, read.buffer());
+               assertEquals(2, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
+               read.buffer().recycle();
                assertEquals(2, listener.getNumNotifiedBuffers());
                assertFalse(buffer.isRecycled());
 
@@ -295,31 +328,40 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertFalse(buffer.isRecycled()); // still one in the reader!
                // still same statistics:
                assertEquals(4, partition.getTotalNumberOfBuffers());
+               assertEquals(2, partition.getBuffersInBacklog());
                assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
                listener.awaitNotifications(4, 30_000);
                assertEquals(4, listener.getNumNotifiedBuffers());
 
                read = reader.getNextBuffer();
-               assertSame(buffer, read);
-               read.recycle();
+               assertNotNull(read);
+               assertEquals(1, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
+               assertSame(buffer, read.buffer());
+               read.buffer().recycle();
                // now the buffer may be freed, depending on the timing of the 
write operation
                // -> let's do this check at the end of the test (to save some 
time)
 
                read = reader.getNextBuffer();
                assertNotNull(read);
-               assertNotSame(buffer, read);
-               assertFalse(read.isRecycled());
-               read.recycle();
-               assertTrue(read.isRecycled());
+               assertEquals(0, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
+               assertNotSame(buffer, read.buffer());
+               assertFalse(read.buffer().isRecycled());
+               read.buffer().recycle();
+               assertTrue(read.buffer().isRecycled());
 
                // End of partition
                read = reader.getNextBuffer();
                assertNotNull(read);
-               assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
-               assertFalse(read.isRecycled());
-               read.recycle();
-               assertTrue(read.isRecycled());
+               assertEquals(0, partition.getBuffersInBacklog());
+               assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
+               assertEquals(EndOfPartitionEvent.class,
+                       EventSerializer.fromBuffer(read.buffer(), 
ClassLoader.getSystemClassLoader()).getClass());
+               assertFalse(read.buffer().isRecycled());
+               read.buffer().recycle();
+               assertTrue(read.buffer().isRecycled());
 
                // finally check that the buffer has been freed after a 
successful (or failed) write
                final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index 69d19fc..89a4e03 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -73,7 +73,7 @@ public class SpilledSubpartitionViewTest {
                        false, new TestConsumerCallback.RecyclingCallback());
 
                SpilledSubpartitionView view = new SpilledSubpartitionView(
-                       mock(ResultSubpartition.class),
+                       mock(SpillableSubpartition.class),
                        viewBufferPool.getMemorySegmentSize(),
                        writer,
                        numberOfBuffersToWrite + 1, // +1 for end-of-partition
@@ -99,7 +99,7 @@ public class SpilledSubpartitionViewTest {
                        false, new TestConsumerCallback.RecyclingCallback());
 
                SpilledSubpartitionView view = new SpilledSubpartitionView(
-                       mock(ResultSubpartition.class),
+                       mock(SpillableSubpartition.class),
                        32 * 1024,
                        writer,
                        numberOfBuffersToWrite + 1,
@@ -140,7 +140,7 @@ public class SpilledSubpartitionViewTest {
 
                        BufferProvider inputBuffers = new 
TestPooledBufferProvider(2);
 
-                       ResultSubpartition parent = 
mock(ResultSubpartition.class);
+                       SpillableSubpartition parent = 
mock(SpillableSubpartition.class);
 
                        // Wait for writers to finish
                        for (BufferFileWriter writer : writers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index d084f62..2d56258 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.util.TestLogger;
 
@@ -51,8 +53,15 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                        assertEquals(1, subpartition.getTotalNumberOfBuffers());
                        assertEquals(4, subpartition.getTotalNumberOfBytes());
 
-                       assertFalse(subpartition.add(mock(Buffer.class)));
                        assertEquals(1, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(0, subpartition.getBuffersInBacklog());
+                       assertEquals(4, subpartition.getTotalNumberOfBytes());
+
+                       Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
+
+                       assertFalse(subpartition.add(buffer));
+                       assertEquals(1, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(0, subpartition.getBuffersInBacklog());
                        assertEquals(4, subpartition.getTotalNumberOfBytes());
                } finally {
                        if (subpartition != null) {
@@ -70,8 +79,15 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                        assertEquals(0, subpartition.getTotalNumberOfBuffers());
                        assertEquals(0, subpartition.getTotalNumberOfBytes());
 
-                       assertFalse(subpartition.add(mock(Buffer.class)));
                        assertEquals(0, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(0, subpartition.getBuffersInBacklog());
+                       assertEquals(0, subpartition.getTotalNumberOfBytes());
+
+                       Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
+
+                       assertFalse(subpartition.add(buffer));
+                       assertEquals(0, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(0, subpartition.getBuffersInBacklog());
                        assertEquals(0, subpartition.getTotalNumberOfBytes());
                } finally {
                        if (subpartition != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 16285b7..5fe835a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -75,11 +75,11 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
                                        hasData = inputIterator.next(reuse) != 
null;
 
                                        // Call getCurrentBuffer to ensure size 
is set
-                                       return new 
InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true);
+                                       return new 
InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true, 0);
                                } else {
                                        
when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
 
-                                       return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false);
+                                       return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false, 0);
                                }
                        }
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index da649cd..59fa7a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -124,7 +125,7 @@ public class SingleInputGateTest {
 
                final ResultSubpartitionView iterator = 
mock(ResultSubpartitionView.class);
                when(iterator.getNextBuffer()).thenReturn(
-                       new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
mock(BufferRecycler.class)));
+                       new BufferAndBacklog(new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
mock(BufferRecycler.class)), 0));
 
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                when(partitionManager.createSubpartitionView(

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index a6597a2..43ac7a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -55,9 +55,9 @@ public class TestInputChannel {
 
        public TestInputChannel read(Buffer buffer) throws IOException, 
InterruptedException {
                if (stubbing == null) {
-                       stubbing = when(mock.getNextBuffer()).thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true));
+                       stubbing = when(mock.getNextBuffer()).thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true, 0));
                } else {
-                       stubbing = stubbing.thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true));
+                       stubbing = stubbing.thenReturn(new 
InputChannel.BufferAndAvailability(buffer, true, 0));
                }
 
                return this;
@@ -77,7 +77,7 @@ public class TestInputChannel {
                                // Return true after finishing
                                when(mock.isReleased()).thenReturn(true);
 
-                               return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false);
+                               return new 
InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE),
 false, 0);
                        }
                };
 

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 676a304..37137ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.io.network.util;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 
 import java.util.Random;
@@ -92,24 +92,24 @@ public class TestSubpartitionConsumer implements 
Callable<Boolean>, BufferAvaila
                                        }
                                }
 
-                               final Buffer buffer = 
subpartitionView.getNextBuffer();
+                               final BufferAndBacklog bufferAndBacklog = 
subpartitionView.getNextBuffer();
 
                                if (isSlowConsumer) {
                                        
Thread.sleep(random.nextInt(MAX_SLEEP_TIME_MS + 1));
                                }
 
-                               if (buffer != null) {
+                               if (bufferAndBacklog != null) {
                                        numBuffersAvailable.decrementAndGet();
 
-                                       if (buffer.isBuffer()) {
-                                               callback.onBuffer(buffer);
+                                       if 
(bufferAndBacklog.buffer().isBuffer()) {
+                                               
callback.onBuffer(bufferAndBacklog.buffer());
                                        } else {
-                                               final AbstractEvent event = 
EventSerializer.fromBuffer(buffer,
+                                               final AbstractEvent event = 
EventSerializer.fromBuffer(bufferAndBacklog.buffer(),
                                                        
getClass().getClassLoader());
 
                                                callback.onEvent(event);
 
-                                               buffer.recycle();
+                                               
bufferAndBacklog.buffer().recycle();
 
                                                if (event.getClass() == 
EndOfPartitionEvent.class) {
                                                        
subpartitionView.notifySubpartitionConsumed();

http://git-wip-us.apache.org/repos/asf/flink/blob/718a2ba0/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index f19d59d..11d8f11 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -98,7 +98,7 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                        if (input != null && 
input.isStreamEnd()) {
                                                
when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
                                                        true);
-                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false);
+                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false, 0);
                                        } else if (input != null && 
input.isStreamRecord()) {
                                                Object inputElement = 
input.getStreamRecord();
 
@@ -107,10 +107,10 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                                
recordSerializer.addRecord(delegate);
 
                                                // Call getCurrentBuffer to 
ensure size is set
-                                               return new 
BufferAndAvailability(recordSerializer.getCurrentBuffer(), false);
+                                               return new 
BufferAndAvailability(recordSerializer.getCurrentBuffer(), false, 0);
                                        } else if (input != null && 
input.isEvent()) {
                                                AbstractEvent event = 
input.getEvent();
-                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(event), false);
+                                               return new 
BufferAndAvailability(EventSerializer.toBuffer(event), false, 0);
                                        } else {
                                                synchronized 
(inputQueues[channelIndex]) {
                                                        
inputQueues[channelIndex].wait();

Reply via email to