This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 94993944130e7975e0a4b99855b89dfd2e59a158 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Fri Feb 21 16:36:05 2020 +0100 [FLINK-18905] Provide basic benchmarks for MultipleInputStreamOperator --- ...tBenchmark.java => MultipleInputBenchmark.java} | 65 ++++++++++---------- .../apache/flink/benchmark/TwoInputBenchmark.java | 4 +- .../operators/MultiplyByTwoOperatorFactory.java | 70 ++++++++++++++++++++++ 3 files changed, 105 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java similarity index 60% copy from src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java copy to src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java index d9291a8..f070c59 100644 --- a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java @@ -18,13 +18,15 @@ package org.apache.flink.benchmark; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.benchmark.functions.LongSource; import org.apache.flink.benchmark.functions.QueuingLongSource; -import org.apache.flink.benchmark.operators.MultiplyByTwoCoStreamMap; +import org.apache.flink.benchmark.operators.MultiplyByTwoOperatorFactory; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; @@ -34,67 +36,68 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import org.openjdk.jmh.runner.options.VerboseMode; -public class TwoInputBenchmark extends BenchmarkBase { +public class MultipleInputBenchmark extends BenchmarkBase { - public static final int RECORDS_PER_INVOCATION = 25_000_000; - - public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 15_000_000; - - private static final long CHECKPOINT_INTERVAL_MS = 100; + public static final int RECORDS_PER_INVOCATION = TwoInputBenchmark.RECORDS_PER_INVOCATION; + public static final int ONE_IDLE_RECORDS_PER_INVOCATION = TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION; + public static final long CHECKPOINT_INTERVAL_MS = TwoInputBenchmark.CHECKPOINT_INTERVAL_MS; public static void main(String[] args) throws RunnerException { Options options = new OptionsBuilder() .verbosity(VerboseMode.NORMAL) - .include(".*" + TwoInputBenchmark.class.getCanonicalName() + ".*") + .include(".*" + MultipleInputBenchmark.class.getSimpleName() + ".*") .build(); new Runner(options).run(); } @Benchmark - @OperationsPerInvocation(value = TwoInputBenchmark.RECORDS_PER_INVOCATION) - public void twoInputMapSink(FlinkEnvironmentContext context) throws Exception { + @OperationsPerInvocation(RECORDS_PER_INVOCATION) + public void multiInputMapSink(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; - env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); - env.setParallelism(1); - - // Setting buffer timeout to 1 is an attempt to improve twoInputMapSink benchmark stability. - // Without 1ms buffer timeout, some JVM forks are much slower then others, making results - // unstable and unreliable. - env.setBufferTimeout(1); long numRecordsPerInput = RECORDS_PER_INVOCATION / 2; DataStreamSource<Long> source1 = env.addSource(new LongSource(numRecordsPerInput)); DataStreamSource<Long> source2 = env.addSource(new LongSource(numRecordsPerInput)); - - source1 - .connect(source2) - .transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap()) - .addSink(new DiscardingSink<>()); + connectAndDiscard(env, source1, source2); env.execute(); } @Benchmark - @OperationsPerInvocation(value = TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION) - public void twoInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exception { + @OperationsPerInvocation(ONE_IDLE_RECORDS_PER_INVOCATION) + public void multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exception { StreamExecutionEnvironment env = context.env; env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); - env.setParallelism(1); QueuingLongSource.reset(); DataStreamSource<Long> source1 = env.addSource(new QueuingLongSource(1, ONE_IDLE_RECORDS_PER_INVOCATION - 1)); DataStreamSource<Long> source2 = env.addSource(new QueuingLongSource(2, 1)); - - source1 - .connect(source2) - .transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap()) - .addSink(new DiscardingSink<>()); + connectAndDiscard(env, source1, source2); env.execute(); } + + private static void connectAndDiscard( + StreamExecutionEnvironment env, + DataStreamSource<Long> source1, + DataStreamSource<Long> source2) { + MultipleInputTransformation<Long> transform = new MultipleInputTransformation<>( + "custom operator", + new MultiplyByTwoOperatorFactory(), + BasicTypeInfo.LONG_TYPE_INFO, + 1); + + transform.addInput(source1.getTransformation()); + transform.addInput(source2.getTransformation()); + + env.addOperator(transform); + new MultipleConnectedStreams(env) + .transform(transform) + .addSink(new DiscardingSink<>()); + } } diff --git a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java b/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java index d9291a8..fe39549 100644 --- a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java @@ -37,10 +37,8 @@ import org.openjdk.jmh.runner.options.VerboseMode; public class TwoInputBenchmark extends BenchmarkBase { public static final int RECORDS_PER_INVOCATION = 25_000_000; - public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 15_000_000; - - private static final long CHECKPOINT_INTERVAL_MS = 100; + public static final long CHECKPOINT_INTERVAL_MS = 100; public static void main(String[] args) throws RunnerException { diff --git a/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java b/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java new file mode 100644 index 0000000..657bc3f --- /dev/null +++ b/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.operators; + +import org.apache.flink.streaming.api.operators.AbstractInput; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Arrays; +import java.util.List; + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class MultiplyByTwoOperatorFactory extends AbstractStreamOperatorFactory<Long> { + @Override + public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) { + return (T) new MultiplyByTwoOperator(parameters); + } + + @Override + public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { + return MultiplyByTwoOperator.class; + } + + public static class MultiplyByTwoOperator extends AbstractStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long> { + public MultiplyByTwoOperator(StreamOperatorParameters<Long> parameters) { + super(parameters, 2); + } + + @Override + public List<Input> getInputs() { + return Arrays.asList( + new MultiplyByTwoOperator.MultiplyByTwoInput(this, 1), + new MultiplyByTwoOperator.MultiplyByTwoInput(this, 2)); + } + + private static class MultiplyByTwoInput extends AbstractInput<Long, Long> { + MultiplyByTwoInput( + AbstractStreamOperatorV2<Long> owner, + int inputId) { + super(owner, inputId); + } + + @Override + public void processElement(StreamRecord<Long> element) { + output.collect(element.replace(element.getValue() * 2)); + } + } + } +}