[FLINK-6337][network] Remove the buffer provider from 
PartitionRequestServerHandler

The buffer provider is not needed and most likely a left over from prior
refactorings.

This closes #3785.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464d6f55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464d6f55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464d6f55

Branch: refs/heads/table-retraction
Commit: 464d6f553be79409bb25007255e1306884d09186
Parents: 3369578
Author: Zhijiang <wangzhijiang...@aliyun.com>
Authored: Wed Apr 26 16:18:54 2017 +0800
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue May 2 13:35:18 2017 +0200

----------------------------------------------------------------------
 .../runtime/io/network/ConnectionManager.java   |  4 +--
 .../io/network/LocalConnectionManager.java      |  5 +---
 .../runtime/io/network/NetworkEnvironment.java  |  2 +-
 .../network/netty/NettyConnectionManager.java   |  5 ++--
 .../network/netty/PartitionRequestProtocol.java |  7 ++---
 .../netty/PartitionRequestServerHandler.java    | 19 ++----------
 .../netty/SequenceNumberingViewReader.java      |  5 +---
 .../partition/PipelinedSubpartition.java        |  3 +-
 .../io/network/partition/ResultPartition.java   |  4 +--
 .../partition/ResultPartitionManager.java       |  4 +--
 .../partition/ResultPartitionProvider.java      |  3 --
 .../network/partition/ResultSubpartition.java   |  3 +-
 .../partition/SpillableSubpartition.java        |  7 ++---
 .../partition/consumer/LocalInputChannel.java   |  2 +-
 .../netty/CancelPartitionRequestTest.java       | 13 ++++----
 .../netty/ClientTransportErrorHandlingTest.java | 10 ++-----
 .../netty/NettyConnectionManagerTest.java       |  7 ++---
 .../netty/PartitionRequestQueueTest.java        |  5 +---
 .../netty/ServerTransportErrorHandlingTest.java |  9 ++----
 .../partition/InputChannelTestUtils.java        |  7 ++---
 .../partition/PipelinedSubpartitionTest.java    |  8 ++---
 .../partition/SpillableSubpartitionTest.java    | 31 ++++++++------------
 .../network/partition/SubpartitionTestBase.java |  5 +---
 .../consumer/LocalInputChannelTest.java         |  8 ++---
 .../partition/consumer/SingleInputGateTest.java |  8 ++---
 ...SuccessAfterNetworkBuffersFailureITCase.java |  2 +-
 26 files changed, 62 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 02deb9d..1225230 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
@@ -31,8 +30,7 @@ import java.io.IOException;
 public interface ConnectionManager {
 
        void start(ResultPartitionProvider partitionProvider,
-                               TaskEventDispatcher taskEventDispatcher,
-                               NetworkBufferPool networkbufferPool) throws 
IOException;
+                               TaskEventDispatcher taskEventDispatcher) throws 
IOException;
 
        /**
         * Creates a {@link PartitionRequestClient} instance for the given 
{@link ConnectionID}.

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 4f51a56..bece6a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
@@ -29,9 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 public class LocalConnectionManager implements ConnectionManager {
 
        @Override
-       public void start(ResultPartitionProvider partitionProvider,
-                                               TaskEventDispatcher 
taskEventDispatcher,
-                                               NetworkBufferPool 
networkbufferPool) {
+       public void start(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher) {
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4d4b305..cc4cb77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -287,7 +287,7 @@ public class NetworkEnvironment {
 
                        try {
                                LOG.debug("Starting network connection 
manager");
-                               connectionManager.start(resultPartitionManager, 
taskEventDispatcher, networkBufferPool);
+                               connectionManager.start(resultPartitionManager, 
taskEventDispatcher);
                        }
                        catch (IOException t) {
                                throw new IOException("Failed to instantiate 
network connection manager.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index abee2a8..fcf618a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import java.io.IOException;
@@ -45,10 +44,10 @@ public class NettyConnectionManager implements 
ConnectionManager {
        }
 
        @Override
-       public void start(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool)
+       public void start(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher)
                        throws IOException {
                PartitionRequestProtocol partitionRequestProtocol =
-                               new PartitionRequestProtocol(partitionProvider, 
taskEventDispatcher, networkbufferPool);
+                               new PartitionRequestProtocol(partitionProvider, 
taskEventDispatcher);
 
                client.init(partitionRequestProtocol, bufferPool);
                server.init(partitionRequestProtocol, bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
index a39f085..d06a018 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.ChannelHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
@@ -34,12 +33,10 @@ class PartitionRequestProtocol implements NettyProtocol {
 
        private final ResultPartitionProvider partitionProvider;
        private final TaskEventDispatcher taskEventDispatcher;
-       private final NetworkBufferPool networkbufferPool;
 
-       PartitionRequestProtocol(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) {
+       PartitionRequestProtocol(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher) {
                this.partitionProvider = partitionProvider;
                this.taskEventDispatcher = taskEventDispatcher;
-               this.networkbufferPool = networkbufferPool;
        }
 
        // +-------------------------------------------------------------------+
@@ -77,7 +74,7 @@ class PartitionRequestProtocol implements NettyProtocol {
        public ChannelHandler[] getServerChannelHandlers() {
                PartitionRequestQueue queueOfPartitionQueues = new 
PartitionRequestQueue();
                PartitionRequestServerHandler serverHandler = new 
PartitionRequestServerHandler(
-                               partitionProvider, taskEventDispatcher, 
queueOfPartitionQueues, networkbufferPool);
+                               partitionProvider, taskEventDispatcher, 
queueOfPartitionQueues);
 
                return new ChannelHandler[] {
                                messageEncoder,

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 6f56877..1bd05f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -47,36 +45,24 @@ class PartitionRequestServerHandler extends 
SimpleChannelInboundHandler<NettyMes
 
        private final PartitionRequestQueue outboundQueue;
 
-       private final NetworkBufferPool networkBufferPool;
-
-       private BufferPool bufferPool;
-
        PartitionRequestServerHandler(
                ResultPartitionProvider partitionProvider,
                TaskEventDispatcher taskEventDispatcher,
-               PartitionRequestQueue outboundQueue,
-               NetworkBufferPool networkBufferPool) {
+               PartitionRequestQueue outboundQueue) {
 
                this.partitionProvider = partitionProvider;
                this.taskEventDispatcher = taskEventDispatcher;
                this.outboundQueue = outboundQueue;
-               this.networkBufferPool = networkBufferPool;
        }
 
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws 
Exception {
                super.channelRegistered(ctx);
-
-               bufferPool = networkBufferPool.createBufferPool(1, 
Integer.MAX_VALUE);
        }
 
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws 
Exception {
                super.channelUnregistered(ctx);
-
-               if (bufferPool != null) {
-                       bufferPool.lazyDestroy();
-               }
        }
 
        @Override
@@ -100,8 +86,7 @@ class PartitionRequestServerHandler extends 
SimpleChannelInboundHandler<NettyMes
                                        reader.requestSubpartitionView(
                                                partitionProvider,
                                                request.partitionId,
-                                               request.queueIndex,
-                                               bufferPool);
+                                               request.queueIndex);
                                } catch (PartitionNotFoundException notFound) {
                                        respondWithError(ctx, notFound, 
request.receiverId);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 5036bb7..6d95ca5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -60,8 +59,7 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener {
        void requestSubpartitionView(
                ResultPartitionProvider partitionProvider,
                ResultPartitionID resultPartitionId,
-               int subPartitionIndex,
-               BufferProvider bufferProvider) throws IOException {
+               int subPartitionIndex) throws IOException {
 
                synchronized (requestLock) {
                        if (subpartitionView == null) {
@@ -72,7 +70,6 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener {
                                this.subpartitionView = 
partitionProvider.createSubpartitionView(
                                        resultPartitionId,
                                        subPartitionIndex,
-                                       bufferProvider,
                                        this);
                        } else {
                                throw new IllegalStateException("Subpartition 
already requested");

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 4fd74e2..ed72b51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,7 +162,7 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
-       public PipelinedSubpartitionView createReadView(BufferProvider 
bufferProvider, BufferAvailabilityListener availabilityListener) throws 
IOException {
+       public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException {
                final int queueSize;
 
                synchronized (buffers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index d207f60..9b02e4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -362,7 +362,7 @@ public class ResultPartition implements BufferPoolOwner {
        /**
         * Returns the requested subpartition.
         */
-       public ResultSubpartitionView createSubpartitionView(int index, 
BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) 
throws IOException {
+       public ResultSubpartitionView createSubpartitionView(int index, 
BufferAvailabilityListener availabilityListener) throws IOException {
                int refCnt = pendingReferences.get();
 
                checkState(refCnt != -1, "Partition released.");
@@ -370,7 +370,7 @@ public class ResultPartition implements BufferPoolOwner {
 
                checkElementIndex(index, subpartitions.length, "Subpartition 
not found.");
 
-               ResultSubpartitionView readView = 
subpartitions[index].createReadView(bufferProvider, availabilityListener);
+               ResultSubpartitionView readView = 
subpartitions[index].createReadView(availabilityListener);
 
                LOG.debug("Created {}", readView);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 8ad3e34..f681548 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -22,7 +22,6 @@ import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Table;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +65,6 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
        public ResultSubpartitionView createSubpartitionView(
                        ResultPartitionID partitionId,
                        int subpartitionIndex,
-                       BufferProvider bufferProvider,
                        BufferAvailabilityListener availabilityListener) throws 
IOException {
 
                synchronized (registeredPartitions) {
@@ -79,7 +77,7 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
 
                        LOG.debug("Requesting subpartition {} of {}.", 
subpartitionIndex, partition);
 
-                       return 
partition.createSubpartitionView(subpartitionIndex, bufferProvider, 
availabilityListener);
+                       return 
partition.createSubpartitionView(subpartitionIndex, availabilityListener);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index 3fbfd49..db72d63 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-
 import java.io.IOException;
 
 public interface ResultPartitionProvider {
@@ -30,7 +28,6 @@ public interface ResultPartitionProvider {
        ResultSubpartitionView createSubpartitionView(
                        ResultPartitionID partitionId,
                        int index,
-                       BufferProvider bufferProvider,
                        BufferAvailabilityListener availabilityListener) throws 
IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index d3cd887..3b4e3c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 
 import java.io.IOException;
 
@@ -77,7 +76,7 @@ public abstract class ResultSubpartition {
 
        abstract public void release() throws IOException;
 
-       abstract public ResultSubpartitionView createReadView(BufferProvider 
bufferProvider, BufferAvailabilityListener availabilityListener) throws 
IOException;
+       abstract public ResultSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException;
 
        abstract int releaseMemory() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index a578188..11c6d16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -164,7 +163,7 @@ class SpillableSubpartition extends ResultSubpartition {
        }
 
        @Override
-       public ResultSubpartitionView createReadView(BufferProvider 
bufferProvider, BufferAvailabilityListener availabilityListener) throws 
IOException {
+       public ResultSubpartitionView createReadView(BufferAvailabilityListener 
availabilityListener) throws IOException {
                synchronized (buffers) {
                        if (!isFinished) {
                                throw new IllegalStateException("Subpartition 
has not been finished yet, " +
@@ -180,7 +179,7 @@ class SpillableSubpartition extends ResultSubpartition {
                        if (spillWriter != null) {
                                readView = new SpilledSubpartitionView(
                                        this,
-                                       bufferProvider.getMemorySegmentSize(),
+                                       
parent.getBufferProvider().getMemorySegmentSize(),
                                        spillWriter,
                                        getTotalNumberOfBuffers(),
                                        availabilityListener);
@@ -189,7 +188,7 @@ class SpillableSubpartition extends ResultSubpartition {
                                        this,
                                        buffers,
                                        ioManager,
-                                       bufferProvider.getMemorySegmentSize(),
+                                       
parent.getBufferProvider().getMemorySegmentSize(),
                                        availabilityListener);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 4e14e93..3ade2f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -113,7 +113,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
                                try {
                                        ResultSubpartitionView subpartitionView 
= partitionManager.createSubpartitionView(
-                                               partitionId, subpartitionIndex, 
inputGate.getBufferProvider(), this);
+                                               partitionId, subpartitionIndex, 
this);
 
                                        if (subpartitionView == null) {
                                                throw new IOException("Error 
requesting subpartition.");

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index a2f866a..8b4fc0e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -78,18 +77,18 @@ public class CancelPartitionRequestTest {
                        final ResultSubpartitionView view = spy(new 
InfiniteSubpartitionView(outboundBuffers, sync));
 
                        // Return infinite subpartition
-                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferAvailabilityListener.class)))
                                .thenAnswer(new 
Answer<ResultSubpartitionView>() {
                                        @Override
                                        public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                               BufferAvailabilityListener 
listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+                                               BufferAvailabilityListener 
listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2];
                                                
listener.notifyBuffersAvailable(Long.MAX_VALUE);
                                                return view;
                                        }
                                });
 
                        PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
-                                       partitions, 
mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+                                       partitions, 
mock(TaskEventDispatcher.class));
 
                        serverAndClient = initServerAndClient(protocol);
 
@@ -129,18 +128,18 @@ public class CancelPartitionRequestTest {
                        final ResultSubpartitionView view = spy(new 
InfiniteSubpartitionView(outboundBuffers, sync));
 
                        // Return infinite subpartition
-                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                       when(partitions.createSubpartitionView(eq(pid), eq(0), 
any(BufferAvailabilityListener.class)))
                                        .thenAnswer(new 
Answer<ResultSubpartitionView>() {
                                                @Override
                                                public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                                       
BufferAvailabilityListener listener = (BufferAvailabilityListener) 
invocationOnMock.getArguments()[3];
+                                                       
BufferAvailabilityListener listener = (BufferAvailabilityListener) 
invocationOnMock.getArguments()[2];
                                                        
listener.notifyBuffersAvailable(Long.MAX_VALUE);
                                                        return view;
                                                }
                                        });
 
                        PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
-                                       partitions, 
mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+                                       partitions, 
mock(TaskEventDispatcher.class));
 
                        serverAndClient = initServerAndClient(protocol);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 22e7754..b4fc46f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -29,7 +29,6 @@ import io.netty.channel.ChannelPromise;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -85,8 +84,7 @@ public class ClientTransportErrorHandlingTest {
                        public ChannelHandler[] getClientChannelHandlers() {
                                return new PartitionRequestProtocol(
                                                
mock(ResultPartitionProvider.class),
-                                               mock(TaskEventDispatcher.class),
-                                               
mock(NetworkBufferPool.class)).getClientChannelHandlers();
+                                               
mock(TaskEventDispatcher.class)).getClientChannelHandlers();
                        }
                };
 
@@ -235,8 +233,7 @@ public class ClientTransportErrorHandlingTest {
                        public ChannelHandler[] getClientChannelHandlers() {
                                return new PartitionRequestProtocol(
                                                
mock(ResultPartitionProvider.class),
-                                               mock(TaskEventDispatcher.class),
-                                               
mock(NetworkBufferPool.class)).getClientChannelHandlers();
+                                               
mock(TaskEventDispatcher.class)).getClientChannelHandlers();
                        }
                };
 
@@ -383,8 +380,7 @@ public class ClientTransportErrorHandlingTest {
        private EmbeddedChannel createEmbeddedChannel() {
                PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
                                mock(ResultPartitionProvider.class),
-                               mock(TaskEventDispatcher.class),
-                               mock(NetworkBufferPool.class));
+                               mock(TaskEventDispatcher.class));
 
                return new EmbeddedChannel(protocol.getClientChannelHandlers());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 8ab572e..77de6bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -23,7 +23,6 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.EventLoopGroup;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.util.NetUtils;
 import org.junit.Test;
@@ -60,8 +59,7 @@ public class NettyConnectionManagerTest {
 
                connectionManager.start(
                                mock(ResultPartitionProvider.class),
-                               mock(TaskEventDispatcher.class),
-                               mock(NetworkBufferPool.class));
+                               mock(TaskEventDispatcher.class));
 
                assertEquals(numberOfSlots, 
connectionManager.getBufferPool().getNumberOfArenas());
 
@@ -129,8 +127,7 @@ public class NettyConnectionManagerTest {
 
                connectionManager.start(
                                mock(ResultPartitionProvider.class),
-                               mock(TaskEventDispatcher.class),
-                               mock(NetworkBufferPool.class));
+                               mock(TaskEventDispatcher.class));
 
                assertEquals(numberOfArenas, 
connectionManager.getBufferPool().getNumberOfArenas());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 7224e96..b969b1c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -43,7 +42,6 @@ public class PartitionRequestQueueTest {
 
                ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
                ResultPartitionID rpid = new ResultPartitionID();
-               BufferProvider bufferProvider = mock(BufferProvider.class);
 
                ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
                when(view.isReleased()).thenReturn(true);
@@ -52,13 +50,12 @@ public class PartitionRequestQueueTest {
                when(partitionProvider.createSubpartitionView(
                        eq(rpid),
                        eq(0),
-                       eq(bufferProvider),
                        
any(BufferAvailabilityListener.class))).thenReturn(view);
 
                EmbeddedChannel ch = new EmbeddedChannel(queue);
 
                SequenceNumberingViewReader seqView = new 
SequenceNumberingViewReader(new InputChannelID(), queue);
-               seqView.requestSubpartitionView(partitionProvider, rpid, 0, 
bufferProvider);
+               seqView.requestSubpartitionView(partitionProvider, rpid, 0);
 
                // Enqueue the erroneous view
                queue.notifyReaderNonEmpty(seqView);

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 1c3557e..3c4ebb3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -23,8 +23,6 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -63,11 +61,11 @@ public class ServerTransportErrorHandlingTest {
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
                when(partitionManager
-                       .createSubpartitionView(any(ResultPartitionID.class), 
anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                       .createSubpartitionView(any(ResultPartitionID.class), 
anyInt(), any(BufferAvailabilityListener.class)))
                        .thenAnswer(new Answer<ResultSubpartitionView>() {
                                @Override
                                public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                       BufferAvailabilityListener listener = 
(BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+                                       BufferAvailabilityListener listener = 
(BufferAvailabilityListener) invocationOnMock.getArguments()[2];
                                        
listener.notifyBuffersAvailable(Long.MAX_VALUE);
                                        return new 
CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
                                }
@@ -78,8 +76,7 @@ public class ServerTransportErrorHandlingTest {
                        public ChannelHandler[] getServerChannelHandlers() {
                                return new PartitionRequestProtocol(
                                        partitionManager,
-                                       mock(TaskEventDispatcher.class),
-                                       
mock(NetworkBufferPool.class)).getServerChannelHandlers();
+                                       
mock(TaskEventDispatcher.class)).getServerChannelHandlers();
                        }
 
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index e292576..a327838 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,14 +59,14 @@ class InputChannelTestUtils {
 
                        @Override
                        public ResultSubpartitionView answer(InvocationOnMock 
invocation) throws Throwable {
-                               BufferAvailabilityListener channel = 
(BufferAvailabilityListener) invocation.getArguments()[3];
-                               return sources[num++].createReadView(null, 
channel);
+                               BufferAvailabilityListener channel = 
(BufferAvailabilityListener) invocation.getArguments()[2];
+                               return sources[num++].createReadView(channel);
                        }
                };
 
                ResultPartitionManager manager = 
mock(ResultPartitionManager.class);
                when(manager.createSubpartitionView(
-                               any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                               any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class)))
                                .thenAnswer(viewCreator);
 
                return manager;

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index e3200d1..de1e8a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -70,14 +70,14 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                final PipelinedSubpartition subpartition = createSubpartition();
 
                // Successful request
-               assertNotNull(subpartition.createReadView(null, new 
BufferAvailabilityListener() {
+               assertNotNull(subpartition.createReadView(new 
BufferAvailabilityListener() {
                        @Override
                        public void notifyBuffersAvailable(long numBuffers) {
                        }
                }));
 
                try {
-                       subpartition.createReadView(null, new 
BufferAvailabilityListener() {
+                       subpartition.createReadView(new 
BufferAvailabilityListener() {
                                @Override
                                public void notifyBuffersAvailable(long 
numBuffers) {
                                }
@@ -94,7 +94,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
 
-               ResultSubpartitionView view = subpartition.createReadView(null, 
listener);
+               ResultSubpartitionView view = 
subpartition.createReadView(listener);
 
                // Empty => should return null
                assertNull(view.getNextBuffer());
@@ -221,7 +221,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                final PipelinedSubpartition subpartition = createSubpartition();
 
                TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
-               final PipelinedSubpartitionView view = 
subpartition.createReadView(null, consumer);
+               final PipelinedSubpartitionView view = 
subpartition.createReadView(consumer);
                consumer.setSubpartitionView(view);
 
                Future<Boolean> producerResult = executorService.submit(

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index b53ef68..2b356a8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -26,8 +26,8 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -67,8 +67,12 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
        }
 
        @Override
-       ResultSubpartition createSubpartition() {
-               return new SpillableSubpartition(0, 
mock(ResultPartition.class), ioManager);
+       SpillableSubpartition createSubpartition() {
+               ResultPartition parent = mock(ResultPartition.class);
+               BufferProvider bufferProvider = mock(BufferProvider.class);
+               when(parent.getBufferProvider()).thenReturn(bufferProvider);
+               when(bufferProvider.getMemorySegmentSize()).thenReturn(32 * 
1024);
+               return new SpillableSubpartition(0, parent, ioManager);
        }
 
        /**
@@ -138,14 +142,13 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
        @Test
        public void testReleasePartitionAndGetNext() throws Exception {
                // Create partition and add some buffers
-               SpillableSubpartition partition = new SpillableSubpartition(
-                       0, mock(ResultPartition.class), ioManager);
+               SpillableSubpartition partition = createSubpartition();
 
                partition.finish();
 
                // Create the read view
                ResultSubpartitionView readView = spy(partition
-                       .createReadView(new TestInfiniteBufferProvider(), new 
BufferAvailabilityListener() {
+                       .createReadView(new BufferAvailabilityListener() {
                                @Override
                                public void notifyBuffersAvailable(long 
numBuffers) {
 
@@ -168,11 +171,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
         */
        @Test
        public void testConsumeSpilledPartition() throws Exception {
-               ResultPartition parent = mock(ResultPartition.class);
-               SpillableSubpartition partition = new SpillableSubpartition(
-                       0,
-                       parent,
-                       ioManager);
+               SpillableSubpartition partition = createSubpartition();
 
                Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
                buffer.retain();
@@ -187,7 +186,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                partition.finish();
 
                BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
-               SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(new TestInfiniteBufferProvider(), listener);
+               SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
 
                verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
@@ -216,11 +215,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
         */
        @Test
        public void testConsumeSpillablePartitionSpilledDuringConsume() throws 
Exception {
-               ResultPartition parent = mock(ResultPartition.class);
-               SpillableSubpartition partition = new SpillableSubpartition(
-                       0,
-                       parent,
-                       ioManager);
+               SpillableSubpartition partition = createSubpartition();
 
                Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
                buffer.retain();
@@ -232,7 +227,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                partition.finish();
 
                AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
-               SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(new TestInfiniteBufferProvider(), listener);
+               SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(listener);
 
                // Initial notification
                assertEquals(1, listener.getNumNotifiedBuffers());

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 14942bc..800542e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -91,11 +90,9 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                partition.add(buffer);
                partition.finish();
 
-               TestInfiniteBufferProvider buffers = new 
TestInfiniteBufferProvider();
-
                // Create the view
                BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
-               ResultSubpartitionView view = partition.createReadView(buffers, 
listener);
+               ResultSubpartitionView view = 
partition.createReadView(listener);
 
                // The added buffer and end-of-partition event
                assertNotNull(view.getNextBuffer());

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 18c3038..fe819a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -198,7 +198,7 @@ public class LocalInputChannelTest {
                LocalInputChannel ch = createLocalInputChannel(inputGate, 
partitionManager, backoff);
 
                when(partitionManager
-                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)))
+                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), any(BufferAvailabilityListener.class)))
                                .thenThrow(new 
PartitionNotFoundException(ch.partitionId));
 
                Timer timer = mock(Timer.class);
@@ -214,7 +214,7 @@ public class LocalInputChannelTest {
                // Initial request
                ch.requestSubpartition(0);
                verify(partitionManager)
-                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class));
+                               .createSubpartitionView(eq(ch.partitionId), 
eq(0), any(BufferAvailabilityListener.class));
 
                // Request subpartition and verify that the actual requests are 
delayed.
                for (long expected : expectedDelays) {
@@ -241,7 +241,7 @@ public class LocalInputChannelTest {
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                when(partitionManager
-                               
.createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+                               
.createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class)))
                                .thenReturn(view);
 
                SingleInputGate inputGate = mock(SingleInputGate.class);
@@ -296,7 +296,6 @@ public class LocalInputChannelTest {
                        .createSubpartitionView(
                                any(ResultPartitionID.class),
                                anyInt(),
-                               any(BufferProvider.class),
                                any(BufferAvailabilityListener.class)))
                        .thenAnswer(new Answer<ResultSubpartitionView>() {
                                @Override
@@ -360,7 +359,6 @@ public class LocalInputChannelTest {
                when(partitionManager.createSubpartitionView(
                                any(ResultPartitionID.class),
                                anyInt(),
-                               any(BufferProvider.class),
                                
any(BufferAvailabilityListener.class))).thenReturn(reader);
 
                LocalInputChannel channel = new LocalInputChannel(

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2d1b4b2..737f17b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -128,7 +127,6 @@ public class SingleInputGateTest {
                when(partitionManager.createSubpartitionView(
                        any(ResultPartitionID.class),
                        anyInt(),
-                       any(BufferProvider.class),
                        
any(BufferAvailabilityListener.class))).thenReturn(iterator);
 
                // Setup reader with one local and one unknown input channel
@@ -163,7 +161,7 @@ public class SingleInputGateTest {
                inputGate.requestPartitions();
 
                // Only the local channel can request
-               verify(partitionManager, 
times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class));
+               verify(partitionManager, 
times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class));
 
                // Send event backwards and initialize unknown channel 
afterwards
                final TaskEvent event = new TestTaskEvent();
@@ -175,7 +173,7 @@ public class SingleInputGateTest {
                // After the update, the pending event should be send to local 
channel
                inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(new 
ResultPartitionID(unknownPartitionId.getPartitionId(), 
unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
 
-               verify(partitionManager, 
times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class));
+               verify(partitionManager, 
times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class));
                verify(taskEventDispatcher, 
times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
        }
 
@@ -215,7 +213,7 @@ public class SingleInputGateTest {
                        ResultPartitionLocation.createLocal()));
 
                verify(partitionManager, never()).createSubpartitionView(
-                       any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class), any(BufferAvailabilityListener.class));
+                       any(ResultPartitionID.class), anyInt(), 
any(BufferAvailabilityListener.class));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 9729b09..710e7df 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends 
TestLogger {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-                       
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 840);
+                       
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 640);
                        
                        cluster = new LocalFlinkMiniCluster(config, false);
 

Reply via email to