[FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks

This allows us to use the output flushing interval as a parameter to evaluate,
too.

This closes #5259.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544c9703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544c9703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544c9703

Branch: refs/heads/master
Commit: 544c9703d97668b8d4a952501756db52156ff2ef
Parents: 6cfb758
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Dec 14 17:30:19 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Jan 9 16:10:51 2018 +0100

----------------------------------------------------------------------
 .../benchmark/LongRecordWriterThread.java       |  94 -------
 .../benchmark/NetworkBenchmarkEnvironment.java  | 278 -------------------
 .../benchmark/NetworkThroughputBenchmark.java   |  90 ------
 .../NetworkThroughputBenchmarkTests.java        |  74 -----
 .../io/network/benchmark/ReceiverThread.java    |  98 -------
 .../benchmark/SerializingLongReceiver.java      |  57 ----
 .../io/benchmark/LongRecordWriterThread.java    |  94 +++++++
 .../runtime/io/benchmark/ReceiverThread.java    |  98 +++++++
 .../io/benchmark/SerializingLongReceiver.java   |  57 ++++
 .../StreamNetworkBenchmarkEnvironment.java      | 257 ++++++++++++++++-
 .../StreamNetworkPointToPointBenchmark.java     |   3 +-
 .../StreamNetworkThroughputBenchmark.java       |  90 ++++++
 .../StreamNetworkThroughputBenchmarkTests.java  |  74 +++++
 13 files changed, 662 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
deleted file mode 100644
index 6018e55..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.types.LongValue;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * Wrapping thread around {@link RecordWriter} that sends a fixed number of 
<tt>LongValue(0)</tt>
- * records.
- */
-public class LongRecordWriterThread extends CheckedThread {
-       private final RecordWriter<LongValue> recordWriter;
-
-       /**
-        * Future to wait on a definition of the number of records to send.
-        */
-       private CompletableFuture<Long> recordsToSend = new 
CompletableFuture<>();
-
-       private volatile boolean running = true;
-
-       public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
-               this.recordWriter = checkNotNull(recordWriter);
-       }
-
-       public void shutdown() {
-               running = false;
-               recordsToSend.complete(0L);
-       }
-
-       /**
-        * Initializes the record writer thread with this many numbers to send.
-        *
-        * <p>If the thread was already started, if may now continue.
-        *
-        * @param records
-        *              number of records to send
-        */
-       public synchronized void setRecordsToSend(long records) {
-               checkState(!recordsToSend.isDone());
-               recordsToSend.complete(records);
-       }
-
-       private synchronized CompletableFuture<Long> getRecordsToSend() {
-               return recordsToSend;
-       }
-
-       private synchronized void finishSendingRecords() {
-               recordsToSend = new CompletableFuture<>();
-       }
-
-       @Override
-       public void go() throws Exception {
-               while (running) {
-                       sendRecords(getRecordsToSend().get());
-               }
-       }
-
-       private void sendRecords(long records) throws IOException, 
InterruptedException {
-               LongValue value = new LongValue(0);
-
-               for (int i = 1; i < records; i++) {
-                       recordWriter.emit(value);
-               }
-               value.setValue(records);
-               recordWriter.broadcastEmit(value);
-               recordWriter.flush();
-
-               finishSendingRecords();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
deleted file mode 100644
index ff06187..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-
-import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
-
-/**
- * Context for network benchmarks executed by the external
- * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
- */
-public class NetworkBenchmarkEnvironment<T extends IOReadableWritable> {
-
-       private static final int BUFFER_SIZE = 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
-
-       private static final int NUM_SLOTS_AND_THREADS = 1;
-
-       private static final InetAddress LOCAL_ADDRESS;
-
-       static {
-               try {
-                       LOCAL_ADDRESS = InetAddress.getLocalHost();
-               } catch (UnknownHostException e) {
-                       throw new Error(e);
-               }
-       }
-
-       protected final JobID jobId = new JobID();
-       protected final IntermediateDataSetID dataSetID = new 
IntermediateDataSetID();
-       protected final ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID();
-
-       protected NetworkEnvironment senderEnv;
-       protected NetworkEnvironment receiverEnv;
-       protected IOManager ioManager;
-
-       protected int channels;
-
-       protected ResultPartitionID[] partitionIds;
-
-       public void setUp(int writers, int channels) throws Exception {
-               this.channels = channels;
-               this.partitionIds = new ResultPartitionID[writers];
-
-               int bufferPoolSize = Math.max(2048, writers * channels * 4);
-               senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
-               receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
-               ioManager = new IOManagerAsync();
-
-               senderEnv.start();
-               receiverEnv.start();
-
-               generatePartitionIds();
-       }
-
-       public void tearDown() {
-               suppressExceptions(senderEnv::shutdown);
-               suppressExceptions(receiverEnv::shutdown);
-               suppressExceptions(ioManager::shutdown);
-       }
-
-       public SerializingLongReceiver createReceiver() throws Exception {
-               TaskManagerLocation senderLocation = new TaskManagerLocation(
-                       ResourceID.generate(),
-                       LOCAL_ADDRESS,
-                       senderEnv.getConnectionManager().getDataPort());
-
-               InputGate receiverGate = createInputGate(
-                       jobId,
-                       dataSetID,
-                       executionAttemptID,
-                       senderLocation,
-                       receiverEnv,
-                       channels);
-
-               SerializingLongReceiver receiver = new 
SerializingLongReceiver(receiverGate, channels * partitionIds.length);
-
-               receiver.start();
-               return receiver;
-       }
-
-       public RecordWriter<T> createRecordWriter(int partitionIndex) throws 
Exception {
-               ResultPartitionWriter sender = createResultPartition(jobId, 
partitionIds[partitionIndex], senderEnv, channels);
-               return new RecordWriter<>(sender);
-       }
-
-       private void generatePartitionIds() throws Exception {
-               for (int writer = 0; writer < partitionIds.length; writer++) {
-                       partitionIds[writer] = new ResultPartitionID();
-               }
-       }
-
-       private NetworkEnvironment createNettyNetworkEnvironment(
-                       @SuppressWarnings("SameParameterValue") int 
bufferPoolSize) throws Exception {
-
-               final NetworkBufferPool bufferPool = new 
NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
-
-               final NettyConnectionManager nettyConnectionManager = new 
NettyConnectionManager(
-                       new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, 
NUM_SLOTS_AND_THREADS, new Configuration()));
-
-               return new NetworkEnvironment(
-                       bufferPool,
-                       nettyConnectionManager,
-                       new ResultPartitionManager(),
-                       new TaskEventDispatcher(),
-                       new KvStateRegistry(),
-                       null,
-                       null,
-                       IOMode.SYNC,
-                       
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
-                       
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
-                       
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
-                       
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
-       }
-
-       protected ResultPartitionWriter createResultPartition(
-                       JobID jobId,
-                       ResultPartitionID partitionId,
-                       NetworkEnvironment environment,
-                       int channels) throws Exception {
-
-               ResultPartition resultPartition = new ResultPartition(
-                       "sender task",
-                       new NoOpTaskActions(),
-                       jobId,
-                       partitionId,
-                       ResultPartitionType.PIPELINED_BOUNDED,
-                       channels,
-                       1,
-                       environment.getResultPartitionManager(),
-                       new NoOpResultPartitionConsumableNotifier(),
-                       ioManager,
-                       false);
-
-               // similar to NetworkEnvironment#registerTask()
-               int numBuffers = resultPartition.getNumberOfSubpartitions() *
-                       
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
-                       
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
-
-               BufferPool bufferPool = 
environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
-               resultPartition.registerBufferPool(bufferPool);
-
-               
environment.getResultPartitionManager().registerResultPartition(resultPartition);
-
-               return resultPartition;
-       }
-
-       private InputGate createInputGate(
-                       JobID jobId,
-                       IntermediateDataSetID dataSetID,
-                       ExecutionAttemptID executionAttemptID,
-                       final TaskManagerLocation senderLocation,
-                       NetworkEnvironment environment,
-                       final int channels) throws IOException {
-
-               InputGate[] gates = new InputGate[channels];
-               for (int channel = 0; channel < channels; ++channel) {
-                       int finalChannel = channel;
-                       InputChannelDeploymentDescriptor[] channelDescriptors = 
Arrays.stream(partitionIds)
-                               .map(partitionId -> new 
InputChannelDeploymentDescriptor(
-                                       partitionId,
-                                       
ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, 
finalChannel))))
-                               
.toArray(InputChannelDeploymentDescriptor[]::new);
-
-                       final InputGateDeploymentDescriptor gateDescriptor = 
new InputGateDeploymentDescriptor(
-                               dataSetID,
-                               ResultPartitionType.PIPELINED_BOUNDED,
-                               channel,
-                               channelDescriptors);
-
-                       SingleInputGate gate = SingleInputGate.create(
-                               "receiving task[" + channel + "]",
-                               jobId,
-                               executionAttemptID,
-                               gateDescriptor,
-                               environment,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
-
-                       // similar to NetworkEnvironment#registerTask()
-                       int numBuffers = gate.getNumberOfInputChannels() *
-                               
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
-                               
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
-
-                       BufferPool bufferPool =
-                               
environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(),
 numBuffers);
-
-                       gate.setBufferPool(bufferPool);
-                       gates[channel] = gate;
-               }
-
-               if (channels > 1) {
-                       return new UnionInputGate(gates);
-               } else {
-                       return gates[0];
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Mocks
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A dummy implementation of the {@link TaskActions}. We implement this 
here rather than using Mockito
-        * to avoid using mockito in this benchmark class.
-        */
-       private static final class NoOpTaskActions implements TaskActions {
-
-               @Override
-               public void triggerPartitionProducerStateCheck(
-                       JobID jobId,
-                       IntermediateDataSetID intermediateDataSetId,
-                       ResultPartitionID resultPartitionId) {}
-
-               @Override
-               public void failExternally(Throwable cause) {}
-       }
-
-       private static final class NoOpResultPartitionConsumableNotifier 
implements ResultPartitionConsumableNotifier {
-
-               @Override
-               public void notifyPartitionConsumable(JobID j, 
ResultPartitionID p, TaskActions t) {}
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
deleted file mode 100644
index 799b7c3..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.types.LongValue;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Network throughput benchmarks executed by the external
- * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
- */
-public class NetworkThroughputBenchmark {
-       private static final long RECEIVER_TIMEOUT = 30_000;
-
-       private NetworkBenchmarkEnvironment<LongValue> environment;
-       private ReceiverThread receiver;
-       private LongRecordWriterThread[] writerThreads;
-
-       /**
-        * Executes the throughput benchmark with the given number of records.
-        *
-        * @param records
-        *              records to pass through the network stack
-        */
-       public void executeBenchmark(long records) throws Exception {
-               final LongValue value = new LongValue();
-               value.setValue(0);
-
-               long lastRecord = records / writerThreads.length;
-               CompletableFuture<?> recordsReceived = 
receiver.setExpectedRecord(lastRecord);
-
-               for (LongRecordWriterThread writerThread : writerThreads) {
-                       writerThread.setRecordsToSend(lastRecord);
-               }
-
-               recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
-       }
-
-       /**
-        * Initializes the throughput benchmark with the given parameters.
-        *
-        * @param recordWriters
-        *              number of senders, i.e.
-        *              {@link 
org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
-        * @param channels
-        *              number of outgoing channels / receivers
-        */
-       public void setUp(int recordWriters, int channels) throws Exception {
-               environment = new NetworkBenchmarkEnvironment<>();
-               environment.setUp(recordWriters, channels);
-               receiver = environment.createReceiver();
-               writerThreads = new LongRecordWriterThread[recordWriters];
-               for (int writer = 0; writer < recordWriters; writer++) {
-                       writerThreads[writer] = new 
LongRecordWriterThread(environment.createRecordWriter(writer));
-                       writerThreads[writer].start();
-               }
-       }
-
-       /**
-        * Shuts down a benchmark previously set up via {@link #setUp}.
-        *
-        * <p>This will wait for all senders to finish but timeout with an 
exception after 5 seconds.
-        */
-       public void tearDown() throws Exception {
-               for (LongRecordWriterThread writerThread : writerThreads) {
-                       writerThread.shutdown();
-                       writerThread.sync(5000);
-               }
-               environment.tearDown();
-               receiver.shutdown();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
deleted file mode 100644
index c84743b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.junit.Test;
-
-/**
- * Tests for various network benchmarks based on {@link 
NetworkThroughputBenchmark}.
- */
-public class NetworkThroughputBenchmarkTests {
-       @Test
-       public void pointToPointBenchmark() throws Exception {
-               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
-               benchmark.setUp(1, 1);
-               try {
-                       benchmark.executeBenchmark(1_000);
-               }
-               finally {
-                       benchmark.tearDown();
-               }
-       }
-
-       @Test
-       public void pointToMultiPointBenchmark() throws Exception {
-               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
-               benchmark.setUp(1, 100);
-               try {
-                       benchmark.executeBenchmark(1_000);
-               }
-               finally {
-                       benchmark.tearDown();
-               }
-       }
-
-       @Test
-       public void multiPointToPointBenchmark() throws Exception {
-               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
-               benchmark.setUp(4, 1);
-               try {
-                       benchmark.executeBenchmark(1_000);
-               }
-               finally {
-                       benchmark.tearDown();
-               }
-       }
-
-       @Test
-       public void multiPointToMultiPointBenchmark() throws Exception {
-               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
-               benchmark.setUp(4, 100);
-               try {
-                       benchmark.executeBenchmark(1_000);
-               }
-               finally {
-                       benchmark.tearDown();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
deleted file mode 100644
index be1c80f..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.core.testutils.CheckedThread;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of 
occurrences of the
- * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is 
correlated with number of input channels.
- */
-public abstract class ReceiverThread extends CheckedThread {
-       protected static final Logger LOG = 
LoggerFactory.getLogger(ReceiverThread.class);
-
-       protected final int expectedRepetitionsOfExpectedRecord;
-
-       protected int expectedRecordCounter;
-       protected CompletableFuture<Long> expectedRecord = new 
CompletableFuture<>();
-       protected CompletableFuture<?> recordsProcessed = new 
CompletableFuture<>();
-
-       protected volatile boolean running;
-
-       ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
-               setName(this.getClass().getName());
-
-               this.expectedRepetitionsOfExpectedRecord = 
expectedRepetitionsOfExpectedRecord;
-               this.running = true;
-       }
-
-       public synchronized CompletableFuture<?> setExpectedRecord(long record) 
{
-               checkState(!expectedRecord.isDone());
-               checkState(!recordsProcessed.isDone());
-               expectedRecord.complete(record);
-               expectedRecordCounter = 0;
-               return recordsProcessed;
-       }
-
-       private synchronized CompletableFuture<Long> getExpectedRecord() {
-               return expectedRecord;
-       }
-
-       private synchronized void finishProcessingExpectedRecords() {
-               checkState(expectedRecord.isDone());
-               checkState(!recordsProcessed.isDone());
-
-               recordsProcessed.complete(null);
-               expectedRecord = new CompletableFuture<>();
-               recordsProcessed = new CompletableFuture<>();
-       }
-
-       @Override
-       public void go() throws Exception {
-               try {
-                       while (running) {
-                               readRecords(getExpectedRecord().get());
-                               finishProcessingExpectedRecords();
-                       }
-               }
-               catch (InterruptedException e) {
-                       if (running) {
-                               throw e;
-                       }
-               } catch (Exception e) {
-                       e.printStackTrace();
-               }
-       }
-
-       protected abstract void readRecords(long lastExpectedRecord) throws 
Exception;
-
-       public void shutdown() {
-               running = false;
-               interrupt();
-               expectedRecord.complete(0L);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
deleted file mode 100644
index 848c018..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.benchmark;
-
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.types.LongValue;
-
-/**
- * {@link ReceiverThread} that deserialize incoming messages.
- */
-public class SerializingLongReceiver extends ReceiverThread {
-
-       private final MutableRecordReader<LongValue> reader;
-
-       @SuppressWarnings("WeakerAccess")
-       public SerializingLongReceiver(InputGate inputGate, int 
expectedRepetitionsOfExpectedRecord) {
-               super(expectedRepetitionsOfExpectedRecord);
-               this.reader = new MutableRecordReader<>(
-                       inputGate,
-                       new String[]{
-                               
EnvironmentInformation.getTemporaryFileDirectory()
-                       });
-       }
-
-       protected void readRecords(long lastExpectedRecord) throws Exception {
-               LOG.debug("readRecords(lastExpectedRecord = {})", 
lastExpectedRecord);
-               final LongValue value = new LongValue();
-
-               while (running && reader.next(value)) {
-                       final long ts = value.getValue();
-                       if (ts == lastExpectedRecord) {
-                               expectedRecordCounter++;
-                               if (expectedRecordCounter == 
expectedRepetitionsOfExpectedRecord) {
-                                       break;
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
new file mode 100644
index 0000000..e6cc2d5
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.types.LongValue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wrapping thread around {@link RecordWriter} that sends a fixed number of 
<tt>LongValue(0)</tt>
+ * records.
+ */
+public class LongRecordWriterThread extends CheckedThread {
+       private final RecordWriter<LongValue> recordWriter;
+
+       /**
+        * Future to wait on a definition of the number of records to send.
+        */
+       private CompletableFuture<Long> recordsToSend = new 
CompletableFuture<>();
+
+       private volatile boolean running = true;
+
+       public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
+               this.recordWriter = checkNotNull(recordWriter);
+       }
+
+       public void shutdown() {
+               running = false;
+               recordsToSend.complete(0L);
+       }
+
+       /**
+        * Initializes the record writer thread with this many numbers to send.
+        *
+        * <p>If the thread was already started, if may now continue.
+        *
+        * @param records
+        *              number of records to send
+        */
+       public synchronized void setRecordsToSend(long records) {
+               checkState(!recordsToSend.isDone());
+               recordsToSend.complete(records);
+       }
+
+       private synchronized CompletableFuture<Long> getRecordsToSend() {
+               return recordsToSend;
+       }
+
+       private synchronized void finishSendingRecords() {
+               recordsToSend = new CompletableFuture<>();
+       }
+
+       @Override
+       public void go() throws Exception {
+               while (running) {
+                       sendRecords(getRecordsToSend().get());
+               }
+       }
+
+       private void sendRecords(long records) throws IOException, 
InterruptedException {
+               LongValue value = new LongValue(0);
+
+               for (int i = 1; i < records; i++) {
+                       recordWriter.emit(value);
+               }
+               value.setValue(records);
+               recordWriter.broadcastEmit(value);
+               recordWriter.flush();
+
+               finishSendingRecords();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
new file mode 100644
index 0000000..126efef
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of 
occurrences of the
+ * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is 
correlated with number of input channels.
+ */
+public abstract class ReceiverThread extends CheckedThread {
+       protected static final Logger LOG = 
LoggerFactory.getLogger(ReceiverThread.class);
+
+       protected final int expectedRepetitionsOfExpectedRecord;
+
+       protected int expectedRecordCounter;
+       protected CompletableFuture<Long> expectedRecord = new 
CompletableFuture<>();
+       protected CompletableFuture<?> recordsProcessed = new 
CompletableFuture<>();
+
+       protected volatile boolean running;
+
+       ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
+               setName(this.getClass().getName());
+
+               this.expectedRepetitionsOfExpectedRecord = 
expectedRepetitionsOfExpectedRecord;
+               this.running = true;
+       }
+
+       public synchronized CompletableFuture<?> setExpectedRecord(long record) 
{
+               checkState(!expectedRecord.isDone());
+               checkState(!recordsProcessed.isDone());
+               expectedRecord.complete(record);
+               expectedRecordCounter = 0;
+               return recordsProcessed;
+       }
+
+       private synchronized CompletableFuture<Long> getExpectedRecord() {
+               return expectedRecord;
+       }
+
+       private synchronized void finishProcessingExpectedRecords() {
+               checkState(expectedRecord.isDone());
+               checkState(!recordsProcessed.isDone());
+
+               recordsProcessed.complete(null);
+               expectedRecord = new CompletableFuture<>();
+               recordsProcessed = new CompletableFuture<>();
+       }
+
+       @Override
+       public void go() throws Exception {
+               try {
+                       while (running) {
+                               readRecords(getExpectedRecord().get());
+                               finishProcessingExpectedRecords();
+                       }
+               }
+               catch (InterruptedException e) {
+                       if (running) {
+                               throw e;
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+       }
+
+       protected abstract void readRecords(long lastExpectedRecord) throws 
Exception;
+
+       public void shutdown() {
+               running = false;
+               interrupt();
+               expectedRecord.complete(0L);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
new file mode 100644
index 0000000..580612c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+       private final MutableRecordReader<LongValue> reader;
+
+       @SuppressWarnings("WeakerAccess")
+       public SerializingLongReceiver(InputGate inputGate, int 
expectedRepetitionsOfExpectedRecord) {
+               super(expectedRepetitionsOfExpectedRecord);
+               this.reader = new MutableRecordReader<>(
+                       inputGate,
+                       new String[]{
+                               
EnvironmentInformation.getTemporaryFileDirectory()
+                       });
+       }
+
+       protected void readRecords(long lastExpectedRecord) throws Exception {
+               LOG.debug("readRecords(lastExpectedRecord = {})", 
lastExpectedRecord);
+               final LongValue value = new LongValue();
+
+               while (running && reader.next(value)) {
+                       final long ts = value.getValue();
+                       if (ts == lastExpectedRecord) {
+                               expectedRecordCounter++;
+                               if (expectedRecordCounter == 
expectedRepetitionsOfExpectedRecord) {
+                                       break;
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index acbbdf8..83508ea 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -18,23 +18,262 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import 
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import 
org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
+
 /**
- * Context for stream network benchmarks executed by the external
+ * Context for network benchmarks executed by the external
  * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
  */
-public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> 
extends NetworkBenchmarkEnvironment<T> {
+public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
+
+       private static final int BUFFER_SIZE = 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+
+       private static final int NUM_SLOTS_AND_THREADS = 1;
+
+       private static final InetAddress LOCAL_ADDRESS;
+
+       static {
+               try {
+                       LOCAL_ADDRESS = InetAddress.getLocalHost();
+               } catch (UnknownHostException e) {
+                       throw new Error(e);
+               }
+       }
+
+       protected final JobID jobId = new JobID();
+       protected final IntermediateDataSetID dataSetID = new 
IntermediateDataSetID();
+       protected final ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID();
+
+       protected NetworkEnvironment senderEnv;
+       protected NetworkEnvironment receiverEnv;
+       protected IOManager ioManager;
+
+       protected int channels;
+
+       protected ResultPartitionID[] partitionIds;
+
+       public void setUp(int writers, int channels) throws Exception {
+               this.channels = channels;
+               this.partitionIds = new ResultPartitionID[writers];
+
+               int bufferPoolSize = Math.max(2048, writers * channels * 4);
+               senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+               receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+               ioManager = new IOManagerAsync();
+
+               senderEnv.start();
+               receiverEnv.start();
+
+               generatePartitionIds();
+       }
+
+       public void tearDown() {
+               suppressExceptions(senderEnv::shutdown);
+               suppressExceptions(receiverEnv::shutdown);
+               suppressExceptions(ioManager::shutdown);
+       }
+
+       public SerializingLongReceiver createReceiver() throws Exception {
+               TaskManagerLocation senderLocation = new TaskManagerLocation(
+                       ResourceID.generate(),
+                       LOCAL_ADDRESS,
+                       senderEnv.getConnectionManager().getDataPort());
+
+               InputGate receiverGate = createInputGate(
+                       jobId,
+                       dataSetID,
+                       executionAttemptID,
+                       senderLocation,
+                       receiverEnv,
+                       channels);
+
+               SerializingLongReceiver receiver = new 
SerializingLongReceiver(receiverGate, channels * partitionIds.length);
+
+               receiver.start();
+               return receiver;
+       }
+
+       public StreamRecordWriter<T> createRecordWriter(int partitionIndex, 
long flushTimeout) throws Exception {
+               ResultPartitionWriter sender = createResultPartition(jobId, 
partitionIds[partitionIndex], senderEnv, channels);
+               return new StreamRecordWriter<>(sender,  new 
RoundRobinChannelSelector<T>(), flushTimeout);
+       }
+
+       private void generatePartitionIds() throws Exception {
+               for (int writer = 0; writer < partitionIds.length; writer++) {
+                       partitionIds[writer] = new ResultPartitionID();
+               }
+       }
+
+       private NetworkEnvironment createNettyNetworkEnvironment(
+                       @SuppressWarnings("SameParameterValue") int 
bufferPoolSize) throws Exception {
+
+               final NetworkBufferPool bufferPool = new 
NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+
+               final NettyConnectionManager nettyConnectionManager = new 
NettyConnectionManager(
+                       new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, 
NUM_SLOTS_AND_THREADS, new Configuration()));
+
+               return new NetworkEnvironment(
+                       bufferPool,
+                       nettyConnectionManager,
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new KvStateRegistry(),
+                       null,
+                       null,
+                       IOMode.SYNC,
+                       
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+                       
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+                       
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+                       
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
+       }
+
+       protected ResultPartitionWriter createResultPartition(
+                       JobID jobId,
+                       ResultPartitionID partitionId,
+                       NetworkEnvironment environment,
+                       int channels) throws Exception {
+
+               ResultPartition resultPartition = new ResultPartition(
+                       "sender task",
+                       new NoOpTaskActions(),
+                       jobId,
+                       partitionId,
+                       ResultPartitionType.PIPELINED_BOUNDED,
+                       channels,
+                       1,
+                       environment.getResultPartitionManager(),
+                       new NoOpResultPartitionConsumableNotifier(),
+                       ioManager,
+                       false);
+
+               // similar to NetworkEnvironment#registerTask()
+               int numBuffers = resultPartition.getNumberOfSubpartitions() *
+                       
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+                       
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+               BufferPool bufferPool = 
environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
+               resultPartition.registerBufferPool(bufferPool);
+
+               
environment.getResultPartitionManager().registerResultPartition(resultPartition);
+
+               return resultPartition;
+       }
+
+       private InputGate createInputGate(
+                       JobID jobId,
+                       IntermediateDataSetID dataSetID,
+                       ExecutionAttemptID executionAttemptID,
+                       final TaskManagerLocation senderLocation,
+                       NetworkEnvironment environment,
+                       final int channels) throws IOException {
+
+               InputGate[] gates = new InputGate[channels];
+               for (int channel = 0; channel < channels; ++channel) {
+                       int finalChannel = channel;
+                       InputChannelDeploymentDescriptor[] channelDescriptors = 
Arrays.stream(partitionIds)
+                               .map(partitionId -> new 
InputChannelDeploymentDescriptor(
+                                       partitionId,
+                                       
ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, 
finalChannel))))
+                               
.toArray(InputChannelDeploymentDescriptor[]::new);
+
+                       final InputGateDeploymentDescriptor gateDescriptor = 
new InputGateDeploymentDescriptor(
+                               dataSetID,
+                               ResultPartitionType.PIPELINED_BOUNDED,
+                               channel,
+                               channelDescriptors);
+
+                       SingleInputGate gate = SingleInputGate.create(
+                               "receiving task[" + channel + "]",
+                               jobId,
+                               executionAttemptID,
+                               gateDescriptor,
+                               environment,
+                               new NoOpTaskActions(),
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+
+                       // similar to NetworkEnvironment#registerTask()
+                       int numBuffers = gate.getNumberOfInputChannels() *
+                               
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+                               
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+                       BufferPool bufferPool =
+                               
environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(),
 numBuffers);
+
+                       gate.setBufferPool(bufferPool);
+                       gates[channel] = gate;
+               }
+
+               if (channels > 1) {
+                       return new UnionInputGate(gates);
+               } else {
+                       return gates[0];
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Mocks
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A dummy implementation of the {@link TaskActions}. We implement this 
here rather than using Mockito
+        * to avoid using mockito in this benchmark class.
+        */
+       private static final class NoOpTaskActions implements TaskActions {
+
+               @Override
+               public void triggerPartitionProducerStateCheck(
+                       JobID jobId,
+                       IntermediateDataSetID intermediateDataSetId,
+                       ResultPartitionID resultPartitionId) {}
+
+               @Override
+               public void failExternally(Throwable cause) {}
+       }
+
+       private static final class NoOpResultPartitionConsumableNotifier 
implements ResultPartitionConsumableNotifier {
 
-       public RecordWriter<T> createStreamRecordWriter(int partitionIndex, 
long flushTimeout)
-                       throws Exception {
-               ResultPartitionWriter sender =
-                       createResultPartition(jobId, 
partitionIds[partitionIndex], senderEnv, channels);
-               return new StreamRecordWriter<>(sender, new 
RoundRobinChannelSelector<>(), flushTimeout);
+               @Override
+               public void notifyPartitionConsumable(JobID j, 
ResultPartitionID p, TaskActions t) {}
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 9286485..843d3e2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.io.benchmark;
 
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.benchmark.ReceiverThread;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -74,7 +73,7 @@ public class StreamNetworkPointToPointBenchmark {
                environment.setUp(1, 1);
 
                receiver = environment.createReceiver();
-               recordWriter = environment.createStreamRecordWriter(0, 
flushTimeout);
+               recordWriter = environment.createRecordWriter(0, flushTimeout);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
new file mode 100644
index 0000000..3f41b00
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.types.LongValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Network throughput benchmarks executed by the external
+ * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
+ */
+public class StreamNetworkThroughputBenchmark {
+       private static final long RECEIVER_TIMEOUT = 30_000;
+
+       private StreamNetworkBenchmarkEnvironment<LongValue> environment;
+       private ReceiverThread receiver;
+       private LongRecordWriterThread[] writerThreads;
+
+       /**
+        * Executes the throughput benchmark with the given number of records.
+        *
+        * @param records
+        *              records to pass through the network stack
+        */
+       public void executeBenchmark(long records) throws Exception {
+               final LongValue value = new LongValue();
+               value.setValue(0);
+
+               long lastRecord = records / writerThreads.length;
+               CompletableFuture<?> recordsReceived = 
receiver.setExpectedRecord(lastRecord);
+
+               for (LongRecordWriterThread writerThread : writerThreads) {
+                       writerThread.setRecordsToSend(lastRecord);
+               }
+
+               recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+       }
+
+       /**
+        * Initializes the throughput benchmark with the given parameters.
+        *
+        * @param recordWriters
+        *              number of senders, i.e.
+        *              {@link 
org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
+        * @param channels
+        *              number of outgoing channels / receivers
+        */
+       public void setUp(int recordWriters, int channels, int flushTimeout) 
throws Exception {
+               environment = new StreamNetworkBenchmarkEnvironment<>();
+               environment.setUp(recordWriters, channels);
+               receiver = environment.createReceiver();
+               writerThreads = new LongRecordWriterThread[recordWriters];
+               for (int writer = 0; writer < recordWriters; writer++) {
+                       writerThreads[writer] = new 
LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout));
+                       writerThreads[writer].start();
+               }
+       }
+
+       /**
+        * Shuts down a benchmark previously set up via {@link #setUp}.
+        *
+        * <p>This will wait for all senders to finish but timeout with an 
exception after 5 seconds.
+        */
+       public void tearDown() throws Exception {
+               for (LongRecordWriterThread writerThread : writerThreads) {
+                       writerThread.shutdown();
+                       writerThread.sync(5000);
+               }
+               environment.tearDown();
+               receiver.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
new file mode 100644
index 0000000..8af8148
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link 
StreamNetworkThroughputBenchmark}.
+ */
+public class StreamNetworkThroughputBenchmarkTests {
+       @Test
+       public void pointToPointBenchmark() throws Exception {
+               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               benchmark.setUp(1, 1, 100);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+
+       @Test
+       public void pointToMultiPointBenchmark() throws Exception {
+               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               benchmark.setUp(1, 100, 100);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+
+       @Test
+       public void multiPointToPointBenchmark() throws Exception {
+               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               benchmark.setUp(4, 1, 100);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+
+       @Test
+       public void multiPointToMultiPointBenchmark() throws Exception {
+               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               benchmark.setUp(4, 100, 100);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+}

Reply via email to