[hotfix][benchmarks] Add network stack benchmarks for LocalInputChannels
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08f7284a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08f7284a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08f7284a Branch: refs/heads/master Commit: 08f7284ab047ada1be0c7e0447000f179d8f33a2 Parents: 5b1e127 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Feb 1 12:05:59 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:42 2018 +0100 ---------------------------------------------------------------------- .../StreamNetworkBenchmarkEnvironment.java | 19 ++++++++++++------ .../StreamNetworkPointToPointBenchmark.java | 2 +- .../StreamNetworkThroughputBenchmark.java | 21 ++++++++++++-------- .../StreamNetworkThroughputBenchmarkTests.java | 16 +++++++++++++++ 4 files changed, 43 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index 553503b..b1613f2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -89,20 +89,27 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { protected IOManager ioManager; protected int channels; + protected boolean localMode = false; protected ResultPartitionID[] partitionIds; - public void setUp(int writers, int channels) throws Exception { + public void setUp(int writers, int channels, boolean localMode) throws Exception { + this.localMode = localMode; this.channels = channels; this.partitionIds = new ResultPartitionID[writers]; - int bufferPoolSize = Math.max(2048, writers * channels * 4); - senderEnv = createNettyNetworkEnvironment(bufferPoolSize); - receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); ioManager = new IOManagerAsync(); + int bufferPoolSize = Math.max(2048, writers * channels * 4); + senderEnv = createNettyNetworkEnvironment(bufferPoolSize); senderEnv.start(); - receiverEnv.start(); + if (localMode) { + receiverEnv = senderEnv; + } + else { + receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); + receiverEnv.start(); + } generatePartitionIds(); } @@ -206,7 +213,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) .map(partitionId -> new InputChannelDeploymentDescriptor( partitionId, - ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) + localMode ? ResultPartitionLocation.createLocal() : ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) .toArray(InputChannelDeploymentDescriptor[]::new); final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java index 843d3e2..cc302e8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -70,7 +70,7 @@ public class StreamNetworkPointToPointBenchmark { */ public void setUp(long flushTimeout) throws Exception { environment = new StreamNetworkBenchmarkEnvironment<>(); - environment.setUp(1, 1); + environment.setUp(1, 1, false); receiver = environment.createReceiver(); recordWriter = environment.createRecordWriter(0, flushTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java index 3f41b00..fe08993 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java @@ -28,19 +28,20 @@ import java.util.concurrent.TimeUnit; * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project. */ public class StreamNetworkThroughputBenchmark { - private static final long RECEIVER_TIMEOUT = 30_000; - private StreamNetworkBenchmarkEnvironment<LongValue> environment; private ReceiverThread receiver; private LongRecordWriterThread[] writerThreads; + public void executeBenchmark(long records) throws Exception { + executeBenchmark(records, Long.MAX_VALUE); + } + /** * Executes the throughput benchmark with the given number of records. * - * @param records - * records to pass through the network stack + * @param records to pass through the network stack */ - public void executeBenchmark(long records) throws Exception { + public void executeBenchmark(long records, long timeout) throws Exception { final LongValue value = new LongValue(); value.setValue(0); @@ -51,7 +52,11 @@ public class StreamNetworkThroughputBenchmark { writerThread.setRecordsToSend(lastRecord); } - recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS); + recordsReceived.get(timeout, TimeUnit.MILLISECONDS); + } + + public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception { + setUp(recordWriters, channels, flushTimeout, false); } /** @@ -63,9 +68,9 @@ public class StreamNetworkThroughputBenchmark { * @param channels * number of outgoing channels / receivers */ - public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception { + public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception { environment = new StreamNetworkBenchmarkEnvironment<>(); - environment.setUp(recordWriters, channels); + environment.setUp(recordWriters, channels, localMode); receiver = environment.createReceiver(); writerThreads = new LongRecordWriterThread[recordWriters]; for (int writer = 0; writer < recordWriters; writer++) { http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java index 8af8148..a8251a8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java @@ -37,6 +37,22 @@ public class StreamNetworkThroughputBenchmarkTests { } @Test + public void largeLocalMode() throws Exception { + StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark(); + env.setUp(4, 10, 100, true); + env.executeBenchmark(10_000_000); + env.tearDown(); + } + + @Test + public void largeRemoteMode() throws Exception { + StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark(); + env.setUp(4, 10, 100, false); + env.executeBenchmark(10_000_000); + env.tearDown(); + } + + @Test public void pointToMultiPointBenchmark() throws Exception { StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); benchmark.setUp(1, 100, 100);