[FLINK-8221][network-benchmarks] Define latency network benchmarks in Flink 
project

This closes #5255.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2558ae51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2558ae51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2558ae51

Branch: refs/heads/master
Commit: 2558ae51140af04d241c77945bf9747b763c0ee8
Parents: c816191
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Dec 7 10:03:49 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Jan 8 13:57:02 2018 +0100

----------------------------------------------------------------------
 .../StreamNetworkBenchmarkEnvironment.java      | 40 +++++++++
 .../StreamNetworkPointToPointBenchmark.java     | 87 ++++++++++++++++++++
 .../StreamNetworkPointToPointBenchmarkTest.java | 38 +++++++++
 3 files changed, 165 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2558ae51/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
new file mode 100644
index 0000000..acbbdf8
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import 
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+import 
org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment;
+import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+
+/**
+ * Context for stream network benchmarks executed by the external
+ * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
+ */
+public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> 
extends NetworkBenchmarkEnvironment<T> {
+
+       public RecordWriter<T> createStreamRecordWriter(int partitionIndex, 
long flushTimeout)
+                       throws Exception {
+               ResultPartitionWriter sender =
+                       createResultPartition(jobId, 
partitionIds[partitionIndex], senderEnv, channels);
+               return new StreamRecordWriter<>(sender, new 
RoundRobinChannelSelector<>(), flushTimeout);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2558ae51/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
new file mode 100644
index 0000000..9286485
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.benchmark.ReceiverThread;
+import org.apache.flink.types.LongValue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Streaming point-to-point latency network benchmarks executed by the external
+ * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
+ */
+public class StreamNetworkPointToPointBenchmark {
+       private static final long RECEIVER_TIMEOUT = 2000;
+
+       private StreamNetworkBenchmarkEnvironment<LongValue> environment;
+       private ReceiverThread receiver;
+       private RecordWriter<LongValue> recordWriter;
+
+       /**
+        * Executes the latency benchmark with the given number of records.
+        *
+        * @param records
+        *              records to pass through the network stack
+        * @param flushAfterLastEmit
+        *              whether to flush the {@link RecordWriter} after the 
last record
+        */
+       public void executeBenchmark(long records, boolean flushAfterLastEmit) 
throws Exception {
+               final LongValue value = new LongValue();
+               value.setValue(0);
+
+               CompletableFuture<?> recordsReceived = 
receiver.setExpectedRecord(records);
+
+               for (int i = 1; i < records; i++) {
+                       recordWriter.emit(value);
+               }
+               value.setValue(records);
+               recordWriter.broadcastEmit(value);
+               if (flushAfterLastEmit) {
+                       recordWriter.flush();
+               }
+
+               recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+       }
+
+       /**
+        * Initializes the throughput benchmark with the given parameters.
+        *
+        * @param flushTimeout
+        *              output flushing interval of the
+        *              {@link 
org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher 
thread
+        */
+       public void setUp(long flushTimeout) throws Exception {
+               environment = new StreamNetworkBenchmarkEnvironment<>();
+               environment.setUp(1, 1);
+
+               receiver = environment.createReceiver();
+               recordWriter = environment.createStreamRecordWriter(0, 
flushTimeout);
+       }
+
+       /**
+        * Shuts down a benchmark previously set up via {@link #setUp}.
+        */
+       public void tearDown() {
+               environment.tearDown();
+               receiver.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2558ae51/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java
new file mode 100644
index 0000000..7d9259c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamNetworkPointToPointBenchmark}.
+ */
+public class StreamNetworkPointToPointBenchmarkTest {
+       @Test
+       public void test() throws Exception {
+               StreamNetworkPointToPointBenchmark benchmark = new 
StreamNetworkPointToPointBenchmark();
+               benchmark.setUp(10);
+               try {
+                       benchmark.executeBenchmark(100, false);
+               }
+               finally {
+                       benchmark.tearDown();
+               }
+       }
+}

Reply via email to