[FLINK-8221][network-benchmarks] Define latency network benchmarks in Flink project
This closes #5255. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2558ae51 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2558ae51 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2558ae51 Branch: refs/heads/master Commit: 2558ae51140af04d241c77945bf9747b763c0ee8 Parents: c816191 Author: Nico Kruber <n...@data-artisans.com> Authored: Thu Dec 7 10:03:49 2017 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Jan 8 13:57:02 2018 +0100 ---------------------------------------------------------------------- .../StreamNetworkBenchmarkEnvironment.java | 40 +++++++++ .../StreamNetworkPointToPointBenchmark.java | 87 ++++++++++++++++++++ .../StreamNetworkPointToPointBenchmarkTest.java | 38 +++++++++ 3 files changed, 165 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2558ae51/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java new file mode 100644 index 0000000..acbbdf8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; +import org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment; +import org.apache.flink.streaming.runtime.io.StreamRecordWriter; + +/** + * Context for stream network benchmarks executed by the external + * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project. + */ +public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> extends NetworkBenchmarkEnvironment<T> { + + public RecordWriter<T> createStreamRecordWriter(int partitionIndex, long flushTimeout) + throws Exception { + ResultPartitionWriter sender = + createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); + return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<>(), flushTimeout); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2558ae51/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java new file mode 100644 index 0000000..9286485 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.benchmark.ReceiverThread; +import org.apache.flink.types.LongValue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Streaming point-to-point latency network benchmarks executed by the external + * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project. + */ +public class StreamNetworkPointToPointBenchmark { + private static final long RECEIVER_TIMEOUT = 2000; + + private StreamNetworkBenchmarkEnvironment<LongValue> environment; + private ReceiverThread receiver; + private RecordWriter<LongValue> recordWriter; + + /** + * Executes the latency benchmark with the given number of records. + * + * @param records + * records to pass through the network stack + * @param flushAfterLastEmit + * whether to flush the {@link RecordWriter} after the last record + */ + public void executeBenchmark(long records, boolean flushAfterLastEmit) throws Exception { + final LongValue value = new LongValue(); + value.setValue(0); + + CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(records); + + for (int i = 1; i < records; i++) { + recordWriter.emit(value); + } + value.setValue(records); + recordWriter.broadcastEmit(value); + if (flushAfterLastEmit) { + recordWriter.flush(); + } + + recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS); + } + + /** + * Initializes the throughput benchmark with the given parameters. + * + * @param flushTimeout + * output flushing interval of the + * {@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread + */ + public void setUp(long flushTimeout) throws Exception { + environment = new StreamNetworkBenchmarkEnvironment<>(); + environment.setUp(1, 1); + + receiver = environment.createReceiver(); + recordWriter = environment.createStreamRecordWriter(0, flushTimeout); + } + + /** + * Shuts down a benchmark previously set up via {@link #setUp}. + */ + public void tearDown() { + environment.tearDown(); + receiver.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2558ae51/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java new file mode 100644 index 0000000..7d9259c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmarkTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.junit.Test; + +/** + * Tests for {@link StreamNetworkPointToPointBenchmark}. + */ +public class StreamNetworkPointToPointBenchmarkTest { + @Test + public void test() throws Exception { + StreamNetworkPointToPointBenchmark benchmark = new StreamNetworkPointToPointBenchmark(); + benchmark.setUp(10); + try { + benchmark.executeBenchmark(100, false); + } + finally { + benchmark.tearDown(); + } + } +}