[FLINK-6337][network] Remove the buffer provider from PartitionRequestServerHandler
The buffer provider is not needed and most likely a left over from prior refactorings. This closes #3785. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/464d6f55 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/464d6f55 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/464d6f55 Branch: refs/heads/table-retraction Commit: 464d6f553be79409bb25007255e1306884d09186 Parents: 3369578 Author: Zhijiang <wangzhijiang...@aliyun.com> Authored: Wed Apr 26 16:18:54 2017 +0800 Committer: Ufuk Celebi <u...@apache.org> Committed: Tue May 2 13:35:18 2017 +0200 ---------------------------------------------------------------------- .../runtime/io/network/ConnectionManager.java | 4 +-- .../io/network/LocalConnectionManager.java | 5 +--- .../runtime/io/network/NetworkEnvironment.java | 2 +- .../network/netty/NettyConnectionManager.java | 5 ++-- .../network/netty/PartitionRequestProtocol.java | 7 ++--- .../netty/PartitionRequestServerHandler.java | 19 ++---------- .../netty/SequenceNumberingViewReader.java | 5 +--- .../partition/PipelinedSubpartition.java | 3 +- .../io/network/partition/ResultPartition.java | 4 +-- .../partition/ResultPartitionManager.java | 4 +-- .../partition/ResultPartitionProvider.java | 3 -- .../network/partition/ResultSubpartition.java | 3 +- .../partition/SpillableSubpartition.java | 7 ++--- .../partition/consumer/LocalInputChannel.java | 2 +- .../netty/CancelPartitionRequestTest.java | 13 ++++---- .../netty/ClientTransportErrorHandlingTest.java | 10 ++----- .../netty/NettyConnectionManagerTest.java | 7 ++--- .../netty/PartitionRequestQueueTest.java | 5 +--- .../netty/ServerTransportErrorHandlingTest.java | 9 ++---- .../partition/InputChannelTestUtils.java | 7 ++--- .../partition/PipelinedSubpartitionTest.java | 8 ++--- .../partition/SpillableSubpartitionTest.java | 31 ++++++++------------ .../network/partition/SubpartitionTestBase.java | 5 +--- .../consumer/LocalInputChannelTest.java | 8 ++--- .../partition/consumer/SingleInputGateTest.java | 8 ++--- ...SuccessAfterNetworkBuffersFailureITCase.java | 2 +- 26 files changed, 62 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index 02deb9d..1225230 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; @@ -31,8 +30,7 @@ import java.io.IOException; public interface ConnectionManager { void start(ResultPartitionProvider partitionProvider, - TaskEventDispatcher taskEventDispatcher, - NetworkBufferPool networkbufferPool) throws IOException; + TaskEventDispatcher taskEventDispatcher) throws IOException; /** * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}. http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java index 4f51a56..bece6a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; @@ -29,9 +28,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; public class LocalConnectionManager implements ConnectionManager { @Override - public void start(ResultPartitionProvider partitionProvider, - TaskEventDispatcher taskEventDispatcher, - NetworkBufferPool networkbufferPool) { + public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 4d4b305..cc4cb77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -287,7 +287,7 @@ public class NetworkEnvironment { try { LOG.debug("Starting network connection manager"); - connectionManager.start(resultPartitionManager, taskEventDispatcher, networkBufferPool); + connectionManager.start(resultPartitionManager, taskEventDispatcher); } catch (IOException t) { throw new IOException("Failed to instantiate network connection manager.", t); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index abee2a8..fcf618a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; import java.io.IOException; @@ -45,10 +44,10 @@ public class NettyConnectionManager implements ConnectionManager { } @Override - public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) + public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { PartitionRequestProtocol partitionRequestProtocol = - new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool); + new PartitionRequestProtocol(partitionProvider, taskEventDispatcher); client.init(partitionRequestProtocol, bufferPool); server.init(partitionRequestProtocol, bufferPool); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java index a39f085..d06a018 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty; import io.netty.channel.ChannelHandler; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder; @@ -34,12 +33,10 @@ class PartitionRequestProtocol implements NettyProtocol { private final ResultPartitionProvider partitionProvider; private final TaskEventDispatcher taskEventDispatcher; - private final NetworkBufferPool networkbufferPool; - PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) { + PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { this.partitionProvider = partitionProvider; this.taskEventDispatcher = taskEventDispatcher; - this.networkbufferPool = networkbufferPool; } // +-------------------------------------------------------------------+ @@ -77,7 +74,7 @@ class PartitionRequestProtocol implements NettyProtocol { public ChannelHandler[] getServerChannelHandlers() { PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( - partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool); + partitionProvider, taskEventDispatcher, queueOfPartitionQueues); return new ChannelHandler[] { messageEncoder, http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java index 6f56877..1bd05f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java @@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest; import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; @@ -47,36 +45,24 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes private final PartitionRequestQueue outboundQueue; - private final NetworkBufferPool networkBufferPool; - - private BufferPool bufferPool; - PartitionRequestServerHandler( ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, - PartitionRequestQueue outboundQueue, - NetworkBufferPool networkBufferPool) { + PartitionRequestQueue outboundQueue) { this.partitionProvider = partitionProvider; this.taskEventDispatcher = taskEventDispatcher; this.outboundQueue = outboundQueue; - this.networkBufferPool = networkBufferPool; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); - - bufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); - - if (bufferPool != null) { - bufferPool.lazyDestroy(); - } } @Override @@ -100,8 +86,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes reader.requestSubpartitionView( partitionProvider, request.partitionId, - request.queueIndex, - bufferPool); + request.queueIndex); } catch (PartitionNotFoundException notFound) { respondWithError(ctx, notFound, request.receiverId); } http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index 5036bb7..6d95ca5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; @@ -60,8 +59,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener { void requestSubpartitionView( ResultPartitionProvider partitionProvider, ResultPartitionID resultPartitionId, - int subPartitionIndex, - BufferProvider bufferProvider) throws IOException { + int subPartitionIndex) throws IOException { synchronized (requestLock) { if (subpartitionView == null) { @@ -72,7 +70,6 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener { this.subpartitionView = partitionProvider.createSubpartitionView( resultPartitionId, subPartitionIndex, - bufferProvider, this); } else { throw new IllegalStateException("Subpartition already requested"); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 4fd74e2..ed72b51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,7 +162,7 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override - public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { final int queueSize; synchronized (buffers) { http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index d207f60..9b02e4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -362,7 +362,7 @@ public class ResultPartition implements BufferPoolOwner { /** * Returns the requested subpartition. */ - public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { int refCnt = pendingReferences.get(); checkState(refCnt != -1, "Partition released."); @@ -370,7 +370,7 @@ public class ResultPartition implements BufferPoolOwner { checkElementIndex(index, subpartitions.length, "Subpartition not found."); - ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener); + ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener); LOG.debug("Created {}", readView); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java index 8ad3e34..f681548 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -22,7 +22,6 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Table; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +65,6 @@ public class ResultPartitionManager implements ResultPartitionProvider { public ResultSubpartitionView createSubpartitionView( ResultPartitionID partitionId, int subpartitionIndex, - BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { synchronized (registeredPartitions) { @@ -79,7 +77,7 @@ public class ResultPartitionManager implements ResultPartitionProvider { LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition); - return partition.createSubpartitionView(subpartitionIndex, bufferProvider, availabilityListener); + return partition.createSubpartitionView(subpartitionIndex, availabilityListener); } } http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java index 3fbfd49..db72d63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; - import java.io.IOException; public interface ResultPartitionProvider { @@ -30,7 +28,6 @@ public interface ResultPartitionProvider { ResultSubpartitionView createSubpartitionView( ResultPartitionID partitionId, int index, - BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index d3cd887..3b4e3c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import java.io.IOException; @@ -77,7 +76,7 @@ public abstract class ResultSubpartition { abstract public void release() throws IOException; - abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException; + abstract public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException; abstract int releaseMemory() throws IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index a578188..11c6d16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,7 +163,7 @@ class SpillableSubpartition extends ResultSubpartition { } @Override - public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + public ResultSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { synchronized (buffers) { if (!isFinished) { throw new IllegalStateException("Subpartition has not been finished yet, " + @@ -180,7 +179,7 @@ class SpillableSubpartition extends ResultSubpartition { if (spillWriter != null) { readView = new SpilledSubpartitionView( this, - bufferProvider.getMemorySegmentSize(), + parent.getBufferProvider().getMemorySegmentSize(), spillWriter, getTotalNumberOfBuffers(), availabilityListener); @@ -189,7 +188,7 @@ class SpillableSubpartition extends ResultSubpartition { this, buffers, ioManager, - bufferProvider.getMemorySegmentSize(), + parent.getBufferProvider().getMemorySegmentSize(), availabilityListener); } http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 4e14e93..3ade2f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -113,7 +113,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit try { ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView( - partitionId, subpartitionIndex, inputGate.getBufferProvider(), this); + partitionId, subpartitionIndex, this); if (subpartitionView == null) { throw new IOException("Error requesting subpartition."); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index a2f866a..8b4fc0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -22,7 +22,6 @@ import io.netty.channel.Channel; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -78,18 +77,18 @@ public class CancelPartitionRequestTest { final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync)); // Return infinite subpartition - when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferAvailabilityListener.class))) .thenAnswer(new Answer<ResultSubpartitionView>() { @Override public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { - BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3]; + BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2]; listener.notifyBuffersAvailable(Long.MAX_VALUE); return view; } }); PartitionRequestProtocol protocol = new PartitionRequestProtocol( - partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class)); + partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); @@ -129,18 +128,18 @@ public class CancelPartitionRequestTest { final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync)); // Return infinite subpartition - when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferAvailabilityListener.class))) .thenAnswer(new Answer<ResultSubpartitionView>() { @Override public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { - BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3]; + BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2]; listener.notifyBuffersAvailable(Long.MAX_VALUE); return view; } }); PartitionRequestProtocol protocol = new PartitionRequestProtocol( - partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class)); + partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index 22e7754..b4fc46f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -29,7 +29,6 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; @@ -85,8 +84,7 @@ public class ClientTransportErrorHandlingTest { public ChannelHandler[] getClientChannelHandlers() { return new PartitionRequestProtocol( mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class), - mock(NetworkBufferPool.class)).getClientChannelHandlers(); + mock(TaskEventDispatcher.class)).getClientChannelHandlers(); } }; @@ -235,8 +233,7 @@ public class ClientTransportErrorHandlingTest { public ChannelHandler[] getClientChannelHandlers() { return new PartitionRequestProtocol( mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class), - mock(NetworkBufferPool.class)).getClientChannelHandlers(); + mock(TaskEventDispatcher.class)).getClientChannelHandlers(); } }; @@ -383,8 +380,7 @@ public class ClientTransportErrorHandlingTest { private EmbeddedChannel createEmbeddedChannel() { PartitionRequestProtocol protocol = new PartitionRequestProtocol( mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class), - mock(NetworkBufferPool.class)); + mock(TaskEventDispatcher.class)); return new EmbeddedChannel(protocol.getClientChannelHandlers()); } http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java index 8ab572e..77de6bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java @@ -23,7 +23,6 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.EventLoopGroup; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; import org.apache.flink.util.NetUtils; import org.junit.Test; @@ -60,8 +59,7 @@ public class NettyConnectionManagerTest { connectionManager.start( mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class), - mock(NetworkBufferPool.class)); + mock(TaskEventDispatcher.class)); assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas()); @@ -129,8 +127,7 @@ public class NettyConnectionManagerTest { connectionManager.start( mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class), - mock(NetworkBufferPool.class)); + mock(TaskEventDispatcher.class)); assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas()); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java index 7224e96..b969b1c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.netty; import io.netty.channel.embedded.EmbeddedChannel; import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; @@ -43,7 +42,6 @@ public class PartitionRequestQueueTest { ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); ResultPartitionID rpid = new ResultPartitionID(); - BufferProvider bufferProvider = mock(BufferProvider.class); ResultSubpartitionView view = mock(ResultSubpartitionView.class); when(view.isReleased()).thenReturn(true); @@ -52,13 +50,12 @@ public class PartitionRequestQueueTest { when(partitionProvider.createSubpartitionView( eq(rpid), eq(0), - eq(bufferProvider), any(BufferAvailabilityListener.class))).thenReturn(view); EmbeddedChannel ch = new EmbeddedChannel(queue); SequenceNumberingViewReader seqView = new SequenceNumberingViewReader(new InputChannelID(), queue); - seqView.requestSubpartitionView(partitionProvider, rpid, 0, bufferProvider); + seqView.requestSubpartitionView(partitionProvider, rpid, 0); // Enqueue the erroneous view queue.notifyReaderNonEmpty(seqView); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index 1c3557e..3c4ebb3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -23,8 +23,6 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; @@ -63,11 +61,11 @@ public class ServerTransportErrorHandlingTest { final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager - .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class))) .thenAnswer(new Answer<ResultSubpartitionView>() { @Override public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { - BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3]; + BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[2]; listener.notifyBuffersAvailable(Long.MAX_VALUE); return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync); } @@ -78,8 +76,7 @@ public class ServerTransportErrorHandlingTest { public ChannelHandler[] getServerChannelHandlers() { return new PartitionRequestProtocol( partitionManager, - mock(TaskEventDispatcher.class), - mock(NetworkBufferPool.class)).getServerChannelHandlers(); + mock(TaskEventDispatcher.class)).getServerChannelHandlers(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java index e292576..a327838 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -60,14 +59,14 @@ class InputChannelTestUtils { @Override public ResultSubpartitionView answer(InvocationOnMock invocation) throws Throwable { - BufferAvailabilityListener channel = (BufferAvailabilityListener) invocation.getArguments()[3]; - return sources[num++].createReadView(null, channel); + BufferAvailabilityListener channel = (BufferAvailabilityListener) invocation.getArguments()[2]; + return sources[num++].createReadView(channel); } }; ResultPartitionManager manager = mock(ResultPartitionManager.class); when(manager.createSubpartitionView( - any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class))) .thenAnswer(viewCreator); return manager; http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index e3200d1..de1e8a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -70,14 +70,14 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { final PipelinedSubpartition subpartition = createSubpartition(); // Successful request - assertNotNull(subpartition.createReadView(null, new BufferAvailabilityListener() { + assertNotNull(subpartition.createReadView(new BufferAvailabilityListener() { @Override public void notifyBuffersAvailable(long numBuffers) { } })); try { - subpartition.createReadView(null, new BufferAvailabilityListener() { + subpartition.createReadView(new BufferAvailabilityListener() { @Override public void notifyBuffersAvailable(long numBuffers) { } @@ -94,7 +94,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class); - ResultSubpartitionView view = subpartition.createReadView(null, listener); + ResultSubpartitionView view = subpartition.createReadView(listener); // Empty => should return null assertNull(view.getNextBuffer()); @@ -221,7 +221,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { final PipelinedSubpartition subpartition = createSubpartition(); TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback); - final PipelinedSubpartitionView view = subpartition.createReadView(null, consumer); + final PipelinedSubpartitionView view = subpartition.createReadView(consumer); consumer.setSubpartitionView(view); Future<Boolean> producerResult = executorService.submit( http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index b53ef68..2b356a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -26,8 +26,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.junit.AfterClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -67,8 +67,12 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { } @Override - ResultSubpartition createSubpartition() { - return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager); + SpillableSubpartition createSubpartition() { + ResultPartition parent = mock(ResultPartition.class); + BufferProvider bufferProvider = mock(BufferProvider.class); + when(parent.getBufferProvider()).thenReturn(bufferProvider); + when(bufferProvider.getMemorySegmentSize()).thenReturn(32 * 1024); + return new SpillableSubpartition(0, parent, ioManager); } /** @@ -138,14 +142,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { @Test public void testReleasePartitionAndGetNext() throws Exception { // Create partition and add some buffers - SpillableSubpartition partition = new SpillableSubpartition( - 0, mock(ResultPartition.class), ioManager); + SpillableSubpartition partition = createSubpartition(); partition.finish(); // Create the read view ResultSubpartitionView readView = spy(partition - .createReadView(new TestInfiniteBufferProvider(), new BufferAvailabilityListener() { + .createReadView(new BufferAvailabilityListener() { @Override public void notifyBuffersAvailable(long numBuffers) { @@ -168,11 +171,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { */ @Test public void testConsumeSpilledPartition() throws Exception { - ResultPartition parent = mock(ResultPartition.class); - SpillableSubpartition partition = new SpillableSubpartition( - 0, - parent, - ioManager); + SpillableSubpartition partition = createSubpartition(); Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); buffer.retain(); @@ -187,7 +186,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { partition.finish(); BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class); - SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener); + SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); @@ -216,11 +215,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { */ @Test public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception { - ResultPartition parent = mock(ResultPartition.class); - SpillableSubpartition partition = new SpillableSubpartition( - 0, - parent, - ioManager); + SpillableSubpartition partition = createSubpartition(); Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); buffer.retain(); @@ -232,7 +227,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { partition.finish(); AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); - SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener); + SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener); // Initial notification assertEquals(1, listener.getNumNotifiedBuffers()); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 14942bc..800542e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.util.TestBufferFactory; -import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -91,11 +90,9 @@ public abstract class SubpartitionTestBase extends TestLogger { partition.add(buffer); partition.finish(); - TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider(); - // Create the view BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class); - ResultSubpartitionView view = partition.createReadView(buffers, listener); + ResultSubpartitionView view = partition.createReadView(listener); // The added buffer and end-of-partition event assertNotNull(view.getNextBuffer()); http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 18c3038..fe819a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -198,7 +198,7 @@ public class LocalInputChannelTest { LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff); when(partitionManager - .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class))) + .createSubpartitionView(eq(ch.partitionId), eq(0), any(BufferAvailabilityListener.class))) .thenThrow(new PartitionNotFoundException(ch.partitionId)); Timer timer = mock(Timer.class); @@ -214,7 +214,7 @@ public class LocalInputChannelTest { // Initial request ch.requestSubpartition(0); verify(partitionManager) - .createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)); + .createSubpartitionView(eq(ch.partitionId), eq(0), any(BufferAvailabilityListener.class)); // Request subpartition and verify that the actual requests are delayed. for (long expected : expectedDelays) { @@ -241,7 +241,7 @@ public class LocalInputChannelTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager - .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class))) + .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class))) .thenReturn(view); SingleInputGate inputGate = mock(SingleInputGate.class); @@ -296,7 +296,6 @@ public class LocalInputChannelTest { .createSubpartitionView( any(ResultPartitionID.class), anyInt(), - any(BufferProvider.class), any(BufferAvailabilityListener.class))) .thenAnswer(new Answer<ResultSubpartitionView>() { @Override @@ -360,7 +359,6 @@ public class LocalInputChannelTest { when(partitionManager.createSubpartitionView( any(ResultPartitionID.class), anyInt(), - any(BufferProvider.class), any(BufferAvailabilityListener.class))).thenReturn(reader); LocalInputChannel channel = new LocalInputChannel( http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 2d1b4b2..737f17b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -128,7 +127,6 @@ public class SingleInputGateTest { when(partitionManager.createSubpartitionView( any(ResultPartitionID.class), anyInt(), - any(BufferProvider.class), any(BufferAvailabilityListener.class))).thenReturn(iterator); // Setup reader with one local and one unknown input channel @@ -163,7 +161,7 @@ public class SingleInputGateTest { inputGate.requestPartitions(); // Only the local channel can request - verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)); + verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)); // Send event backwards and initialize unknown channel afterwards final TaskEvent event = new TestTaskEvent(); @@ -175,7 +173,7 @@ public class SingleInputGateTest { // After the update, the pending event should be send to local channel inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal())); - verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)); + verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)); verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); } @@ -215,7 +213,7 @@ public class SingleInputGateTest { ResultPartitionLocation.createLocal())); verify(partitionManager, never()).createSubpartitionView( - any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)); + any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/464d6f55/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index 9729b09..710e7df 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 840); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 640); cluster = new LocalFlinkMiniCluster(config, false);