[FLINK-8220][network-benchmarks] Define network benchmarks in Flink project


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8161911
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8161911
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8161911

Branch: refs/heads/master
Commit: c816191113d813156467f3e33856636ef0bcce38
Parents: 81d3e72
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Dec 7 10:03:32 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Jan 8 11:46:01 2018 +0100

----------------------------------------------------------------------
 .../benchmark/LongRecordWriterThread.java       |  94 +++++++
 .../benchmark/NetworkBenchmarkEnvironment.java  | 278 +++++++++++++++++++
 .../benchmark/NetworkThroughputBenchmark.java   |  90 ++++++
 .../NetworkThroughputBenchmarkTests.java        |  74 +++++
 .../io/network/benchmark/ReceiverThread.java    |  98 +++++++
 .../benchmark/SerializingLongReceiver.java      |  57 ++++
 6 files changed, 691 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
new file mode 100644
index 0000000..6018e55
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.types.LongValue;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Wrapping thread around {@link RecordWriter} that sends a fixed number of 
<tt>LongValue(0)</tt>
+ * records.
+ */
+public class LongRecordWriterThread extends CheckedThread {
+       private final RecordWriter<LongValue> recordWriter;
+
+       /**
+        * Future to wait on a definition of the number of records to send.
+        */
+       private CompletableFuture<Long> recordsToSend = new 
CompletableFuture<>();
+
+       private volatile boolean running = true;
+
+       public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
+               this.recordWriter = checkNotNull(recordWriter);
+       }
+
+       public void shutdown() {
+               running = false;
+               recordsToSend.complete(0L);
+       }
+
+       /**
+        * Initializes the record writer thread with this many numbers to send.
+        *
+        * <p>If the thread was already started, if may now continue.
+        *
+        * @param records
+        *              number of records to send
+        */
+       public synchronized void setRecordsToSend(long records) {
+               checkState(!recordsToSend.isDone());
+               recordsToSend.complete(records);
+       }
+
+       private synchronized CompletableFuture<Long> getRecordsToSend() {
+               return recordsToSend;
+       }
+
+       private synchronized void finishSendingRecords() {
+               recordsToSend = new CompletableFuture<>();
+       }
+
+       @Override
+       public void go() throws Exception {
+               while (running) {
+                       sendRecords(getRecordsToSend().get());
+               }
+       }
+
+       private void sendRecords(long records) throws IOException, 
InterruptedException {
+               LongValue value = new LongValue(0);
+
+               for (int i = 1; i < records; i++) {
+                       recordWriter.emit(value);
+               }
+               value.setValue(records);
+               recordWriter.broadcastEmit(value);
+               recordWriter.flush();
+
+               finishSendingRecords();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
new file mode 100644
index 0000000..ff06187
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
+
+/**
+ * Context for network benchmarks executed by the external
+ * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
+ */
+public class NetworkBenchmarkEnvironment<T extends IOReadableWritable> {
+
+       private static final int BUFFER_SIZE = 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+
+       private static final int NUM_SLOTS_AND_THREADS = 1;
+
+       private static final InetAddress LOCAL_ADDRESS;
+
+       static {
+               try {
+                       LOCAL_ADDRESS = InetAddress.getLocalHost();
+               } catch (UnknownHostException e) {
+                       throw new Error(e);
+               }
+       }
+
+       protected final JobID jobId = new JobID();
+       protected final IntermediateDataSetID dataSetID = new 
IntermediateDataSetID();
+       protected final ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID();
+
+       protected NetworkEnvironment senderEnv;
+       protected NetworkEnvironment receiverEnv;
+       protected IOManager ioManager;
+
+       protected int channels;
+
+       protected ResultPartitionID[] partitionIds;
+
+       public void setUp(int writers, int channels) throws Exception {
+               this.channels = channels;
+               this.partitionIds = new ResultPartitionID[writers];
+
+               int bufferPoolSize = Math.max(2048, writers * channels * 4);
+               senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+               receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+               ioManager = new IOManagerAsync();
+
+               senderEnv.start();
+               receiverEnv.start();
+
+               generatePartitionIds();
+       }
+
+       public void tearDown() {
+               suppressExceptions(senderEnv::shutdown);
+               suppressExceptions(receiverEnv::shutdown);
+               suppressExceptions(ioManager::shutdown);
+       }
+
+       public SerializingLongReceiver createReceiver() throws Exception {
+               TaskManagerLocation senderLocation = new TaskManagerLocation(
+                       ResourceID.generate(),
+                       LOCAL_ADDRESS,
+                       senderEnv.getConnectionManager().getDataPort());
+
+               InputGate receiverGate = createInputGate(
+                       jobId,
+                       dataSetID,
+                       executionAttemptID,
+                       senderLocation,
+                       receiverEnv,
+                       channels);
+
+               SerializingLongReceiver receiver = new 
SerializingLongReceiver(receiverGate, channels * partitionIds.length);
+
+               receiver.start();
+               return receiver;
+       }
+
+       public RecordWriter<T> createRecordWriter(int partitionIndex) throws 
Exception {
+               ResultPartitionWriter sender = createResultPartition(jobId, 
partitionIds[partitionIndex], senderEnv, channels);
+               return new RecordWriter<>(sender);
+       }
+
+       private void generatePartitionIds() throws Exception {
+               for (int writer = 0; writer < partitionIds.length; writer++) {
+                       partitionIds[writer] = new ResultPartitionID();
+               }
+       }
+
+       private NetworkEnvironment createNettyNetworkEnvironment(
+                       @SuppressWarnings("SameParameterValue") int 
bufferPoolSize) throws Exception {
+
+               final NetworkBufferPool bufferPool = new 
NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+
+               final NettyConnectionManager nettyConnectionManager = new 
NettyConnectionManager(
+                       new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, 
NUM_SLOTS_AND_THREADS, new Configuration()));
+
+               return new NetworkEnvironment(
+                       bufferPool,
+                       nettyConnectionManager,
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new KvStateRegistry(),
+                       null,
+                       null,
+                       IOMode.SYNC,
+                       
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
+                       
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
+                       
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
+                       
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
+       }
+
+       protected ResultPartitionWriter createResultPartition(
+                       JobID jobId,
+                       ResultPartitionID partitionId,
+                       NetworkEnvironment environment,
+                       int channels) throws Exception {
+
+               ResultPartition resultPartition = new ResultPartition(
+                       "sender task",
+                       new NoOpTaskActions(),
+                       jobId,
+                       partitionId,
+                       ResultPartitionType.PIPELINED_BOUNDED,
+                       channels,
+                       1,
+                       environment.getResultPartitionManager(),
+                       new NoOpResultPartitionConsumableNotifier(),
+                       ioManager,
+                       false);
+
+               // similar to NetworkEnvironment#registerTask()
+               int numBuffers = resultPartition.getNumberOfSubpartitions() *
+                       
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+                       
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+               BufferPool bufferPool = 
environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
+               resultPartition.registerBufferPool(bufferPool);
+
+               
environment.getResultPartitionManager().registerResultPartition(resultPartition);
+
+               return resultPartition;
+       }
+
+       private InputGate createInputGate(
+                       JobID jobId,
+                       IntermediateDataSetID dataSetID,
+                       ExecutionAttemptID executionAttemptID,
+                       final TaskManagerLocation senderLocation,
+                       NetworkEnvironment environment,
+                       final int channels) throws IOException {
+
+               InputGate[] gates = new InputGate[channels];
+               for (int channel = 0; channel < channels; ++channel) {
+                       int finalChannel = channel;
+                       InputChannelDeploymentDescriptor[] channelDescriptors = 
Arrays.stream(partitionIds)
+                               .map(partitionId -> new 
InputChannelDeploymentDescriptor(
+                                       partitionId,
+                                       
ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, 
finalChannel))))
+                               
.toArray(InputChannelDeploymentDescriptor[]::new);
+
+                       final InputGateDeploymentDescriptor gateDescriptor = 
new InputGateDeploymentDescriptor(
+                               dataSetID,
+                               ResultPartitionType.PIPELINED_BOUNDED,
+                               channel,
+                               channelDescriptors);
+
+                       SingleInputGate gate = SingleInputGate.create(
+                               "receiving task[" + channel + "]",
+                               jobId,
+                               executionAttemptID,
+                               gateDescriptor,
+                               environment,
+                               new NoOpTaskActions(),
+                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+
+                       // similar to NetworkEnvironment#registerTask()
+                       int numBuffers = gate.getNumberOfInputChannels() *
+                               
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
+                               
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
+
+                       BufferPool bufferPool =
+                               
environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(),
 numBuffers);
+
+                       gate.setBufferPool(bufferPool);
+                       gates[channel] = gate;
+               }
+
+               if (channels > 1) {
+                       return new UnionInputGate(gates);
+               } else {
+                       return gates[0];
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Mocks
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A dummy implementation of the {@link TaskActions}. We implement this 
here rather than using Mockito
+        * to avoid using mockito in this benchmark class.
+        */
+       private static final class NoOpTaskActions implements TaskActions {
+
+               @Override
+               public void triggerPartitionProducerStateCheck(
+                       JobID jobId,
+                       IntermediateDataSetID intermediateDataSetId,
+                       ResultPartitionID resultPartitionId) {}
+
+               @Override
+               public void failExternally(Throwable cause) {}
+       }
+
+       private static final class NoOpResultPartitionConsumableNotifier 
implements ResultPartitionConsumableNotifier {
+
+               @Override
+               public void notifyPartitionConsumable(JobID j, 
ResultPartitionID p, TaskActions t) {}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
new file mode 100644
index 0000000..799b7c3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.types.LongValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Network throughput benchmarks executed by the external
+ * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
+ */
+public class NetworkThroughputBenchmark {
+       private static final long RECEIVER_TIMEOUT = 30_000;
+
+       private NetworkBenchmarkEnvironment<LongValue> environment;
+       private ReceiverThread receiver;
+       private LongRecordWriterThread[] writerThreads;
+
+       /**
+        * Executes the throughput benchmark with the given number of records.
+        *
+        * @param records
+        *              records to pass through the network stack
+        */
+       public void executeBenchmark(long records) throws Exception {
+               final LongValue value = new LongValue();
+               value.setValue(0);
+
+               long lastRecord = records / writerThreads.length;
+               CompletableFuture<?> recordsReceived = 
receiver.setExpectedRecord(lastRecord);
+
+               for (LongRecordWriterThread writerThread : writerThreads) {
+                       writerThread.setRecordsToSend(lastRecord);
+               }
+
+               recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+       }
+
+       /**
+        * Initializes the throughput benchmark with the given parameters.
+        *
+        * @param recordWriters
+        *              number of senders, i.e.
+        *              {@link 
org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
+        * @param channels
+        *              number of outgoing channels / receivers
+        */
+       public void setUp(int recordWriters, int channels) throws Exception {
+               environment = new NetworkBenchmarkEnvironment<>();
+               environment.setUp(recordWriters, channels);
+               receiver = environment.createReceiver();
+               writerThreads = new LongRecordWriterThread[recordWriters];
+               for (int writer = 0; writer < recordWriters; writer++) {
+                       writerThreads[writer] = new 
LongRecordWriterThread(environment.createRecordWriter(writer));
+                       writerThreads[writer].start();
+               }
+       }
+
+       /**
+        * Shuts down a benchmark previously set up via {@link #setUp}.
+        *
+        * <p>This will wait for all senders to finish but timeout with an 
exception after 5 seconds.
+        */
+       public void tearDown() throws Exception {
+               for (LongRecordWriterThread writerThread : writerThreads) {
+                       writerThread.shutdown();
+                       writerThread.sync(5000);
+               }
+               environment.tearDown();
+               receiver.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
new file mode 100644
index 0000000..c84743b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link 
NetworkThroughputBenchmark}.
+ */
+public class NetworkThroughputBenchmarkTests {
+       @Test
+       public void pointToPointBenchmark() throws Exception {
+               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
+               benchmark.setUp(1, 1);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+
+       @Test
+       public void pointToMultiPointBenchmark() throws Exception {
+               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
+               benchmark.setUp(1, 100);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+
+       @Test
+       public void multiPointToPointBenchmark() throws Exception {
+               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
+               benchmark.setUp(4, 1);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+
+       @Test
+       public void multiPointToMultiPointBenchmark() throws Exception {
+               NetworkThroughputBenchmark benchmark = new 
NetworkThroughputBenchmark();
+               benchmark.setUp(4, 100);
+               try {
+                       benchmark.executeBenchmark(1_000);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
new file mode 100644
index 0000000..be1c80f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.core.testutils.CheckedThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of 
occurrences of the
+ * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is 
correlated with number of input channels.
+ */
+public abstract class ReceiverThread extends CheckedThread {
+       protected static final Logger LOG = 
LoggerFactory.getLogger(ReceiverThread.class);
+
+       protected final int expectedRepetitionsOfExpectedRecord;
+
+       protected int expectedRecordCounter;
+       protected CompletableFuture<Long> expectedRecord = new 
CompletableFuture<>();
+       protected CompletableFuture<?> recordsProcessed = new 
CompletableFuture<>();
+
+       protected volatile boolean running;
+
+       ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
+               setName(this.getClass().getName());
+
+               this.expectedRepetitionsOfExpectedRecord = 
expectedRepetitionsOfExpectedRecord;
+               this.running = true;
+       }
+
+       public synchronized CompletableFuture<?> setExpectedRecord(long record) 
{
+               checkState(!expectedRecord.isDone());
+               checkState(!recordsProcessed.isDone());
+               expectedRecord.complete(record);
+               expectedRecordCounter = 0;
+               return recordsProcessed;
+       }
+
+       private synchronized CompletableFuture<Long> getExpectedRecord() {
+               return expectedRecord;
+       }
+
+       private synchronized void finishProcessingExpectedRecords() {
+               checkState(expectedRecord.isDone());
+               checkState(!recordsProcessed.isDone());
+
+               recordsProcessed.complete(null);
+               expectedRecord = new CompletableFuture<>();
+               recordsProcessed = new CompletableFuture<>();
+       }
+
+       @Override
+       public void go() throws Exception {
+               try {
+                       while (running) {
+                               readRecords(getExpectedRecord().get());
+                               finishProcessingExpectedRecords();
+                       }
+               }
+               catch (InterruptedException e) {
+                       if (running) {
+                               throw e;
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+       }
+
+       protected abstract void readRecords(long lastExpectedRecord) throws 
Exception;
+
+       public void shutdown() {
+               running = false;
+               interrupt();
+               expectedRecord.complete(0L);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8161911/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
new file mode 100644
index 0000000..848c018
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+       private final MutableRecordReader<LongValue> reader;
+
+       @SuppressWarnings("WeakerAccess")
+       public SerializingLongReceiver(InputGate inputGate, int 
expectedRepetitionsOfExpectedRecord) {
+               super(expectedRepetitionsOfExpectedRecord);
+               this.reader = new MutableRecordReader<>(
+                       inputGate,
+                       new String[]{
+                               
EnvironmentInformation.getTemporaryFileDirectory()
+                       });
+       }
+
+       protected void readRecords(long lastExpectedRecord) throws Exception {
+               LOG.debug("readRecords(lastExpectedRecord = {})", 
lastExpectedRecord);
+               final LongValue value = new LongValue();
+
+               while (running && reader.next(value)) {
+                       final long ts = value.getValue();
+                       if (ts == lastExpectedRecord) {
+                               expectedRecordCounter++;
+                               if (expectedRecordCounter == 
expectedRepetitionsOfExpectedRecord) {
+                                       break;
+                               }
+                       }
+               }
+       }
+}

Reply via email to