This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 94993944130e7975e0a4b99855b89dfd2e59a158
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Fri Feb 21 16:36:05 2020 +0100

    [FLINK-18905] Provide basic benchmarks for MultipleInputStreamOperator
---
 ...tBenchmark.java => MultipleInputBenchmark.java} | 65 ++++++++++----------
 .../apache/flink/benchmark/TwoInputBenchmark.java  |  4 +-
 .../operators/MultiplyByTwoOperatorFactory.java    | 70 ++++++++++++++++++++++
 3 files changed, 105 insertions(+), 34 deletions(-)

diff --git a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
similarity index 60%
copy from src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
copy to src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
index d9291a8..f070c59 100644
--- a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
@@ -18,13 +18,15 @@
 
 package org.apache.flink.benchmark;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.benchmark.functions.LongSource;
 import org.apache.flink.benchmark.functions.QueuingLongSource;
-import org.apache.flink.benchmark.operators.MultiplyByTwoCoStreamMap;
+import org.apache.flink.benchmark.operators.MultiplyByTwoOperatorFactory;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
@@ -34,67 +36,68 @@ import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.VerboseMode;
 
-public class TwoInputBenchmark extends BenchmarkBase {
+public class MultipleInputBenchmark extends BenchmarkBase {
 
-       public static final int RECORDS_PER_INVOCATION = 25_000_000;
-
-       public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 15_000_000;
-
-       private static final long CHECKPOINT_INTERVAL_MS = 100;
+       public static final int RECORDS_PER_INVOCATION = 
TwoInputBenchmark.RECORDS_PER_INVOCATION;
+       public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 
TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION;
+       public static final long CHECKPOINT_INTERVAL_MS = 
TwoInputBenchmark.CHECKPOINT_INTERVAL_MS;
 
        public static void main(String[] args)
                throws RunnerException {
                Options options = new OptionsBuilder()
                        .verbosity(VerboseMode.NORMAL)
-                       .include(".*" + 
TwoInputBenchmark.class.getCanonicalName() + ".*")
+                       .include(".*" + 
MultipleInputBenchmark.class.getSimpleName() + ".*")
                        .build();
 
                new Runner(options).run();
        }
 
        @Benchmark
-       @OperationsPerInvocation(value = 
TwoInputBenchmark.RECORDS_PER_INVOCATION)
-       public void twoInputMapSink(FlinkEnvironmentContext context) throws 
Exception {
+       @OperationsPerInvocation(RECORDS_PER_INVOCATION)
+       public void multiInputMapSink(FlinkEnvironmentContext context) throws 
Exception {
 
                StreamExecutionEnvironment env = context.env;
-
                env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
-               env.setParallelism(1);
-
-               // Setting buffer timeout to 1 is an attempt to improve 
twoInputMapSink benchmark stability.
-               // Without 1ms buffer timeout, some JVM forks are much slower 
then others, making results
-               // unstable and unreliable.
-               env.setBufferTimeout(1);
 
                long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
                DataStreamSource<Long> source1 = env.addSource(new 
LongSource(numRecordsPerInput));
                DataStreamSource<Long> source2 = env.addSource(new 
LongSource(numRecordsPerInput));
-
-               source1
-                       .connect(source2)
-                       .transform("custom operator", 
TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap())
-                       .addSink(new DiscardingSink<>());
+               connectAndDiscard(env, source1, source2);
 
                env.execute();
        }
 
        @Benchmark
-       @OperationsPerInvocation(value = 
TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION)
-       public void twoInputOneIdleMapSink(FlinkEnvironmentContext context) 
throws Exception {
+       @OperationsPerInvocation(ONE_IDLE_RECORDS_PER_INVOCATION)
+       public void multiInputOneIdleMapSink(FlinkEnvironmentContext context) 
throws Exception {
 
                StreamExecutionEnvironment env = context.env;
                env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
-               env.setParallelism(1);
 
                QueuingLongSource.reset();
                DataStreamSource<Long> source1 = env.addSource(new 
QueuingLongSource(1, ONE_IDLE_RECORDS_PER_INVOCATION - 1));
                DataStreamSource<Long> source2 = env.addSource(new 
QueuingLongSource(2, 1));
-
-               source1
-                               .connect(source2)
-                               .transform("custom operator", 
TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap())
-                               .addSink(new DiscardingSink<>());
+               connectAndDiscard(env, source1, source2);
 
                env.execute();
        }
+
+       private static void connectAndDiscard(
+                       StreamExecutionEnvironment env,
+                       DataStreamSource<Long> source1,
+                       DataStreamSource<Long> source2) {
+               MultipleInputTransformation<Long> transform = new 
MultipleInputTransformation<>(
+                               "custom operator",
+                               new MultiplyByTwoOperatorFactory(),
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               1);
+
+               transform.addInput(source1.getTransformation());
+               transform.addInput(source2.getTransformation());
+
+               env.addOperator(transform);
+               new MultipleConnectedStreams(env)
+                               .transform(transform)
+                               .addSink(new DiscardingSink<>());
+       }
 }
diff --git a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
index d9291a8..fe39549 100644
--- a/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/TwoInputBenchmark.java
@@ -37,10 +37,8 @@ import org.openjdk.jmh.runner.options.VerboseMode;
 public class TwoInputBenchmark extends BenchmarkBase {
 
        public static final int RECORDS_PER_INVOCATION = 25_000_000;
-
        public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 15_000_000;
-
-       private static final long CHECKPOINT_INTERVAL_MS = 100;
+       public static final long CHECKPOINT_INTERVAL_MS = 100;
 
        public static void main(String[] args)
                throws RunnerException {
diff --git 
a/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java
 
b/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java
new file mode 100644
index 0000000..657bc3f
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoOperatorFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.operators;
+
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Arrays;
+import java.util.List;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class MultiplyByTwoOperatorFactory extends 
AbstractStreamOperatorFactory<Long> {
+       @Override
+       public <T extends StreamOperator<Long>> T 
createStreamOperator(StreamOperatorParameters<Long> parameters) {
+               return (T) new MultiplyByTwoOperator(parameters);
+       }
+
+       @Override
+       public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+               return MultiplyByTwoOperator.class;
+       }
+
+       public static class MultiplyByTwoOperator extends 
AbstractStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long> {
+               public MultiplyByTwoOperator(StreamOperatorParameters<Long> 
parameters) {
+                       super(parameters, 2);
+               }
+
+               @Override
+               public List<Input> getInputs() {
+                       return Arrays.asList(
+                                       new 
MultiplyByTwoOperator.MultiplyByTwoInput(this, 1),
+                                       new 
MultiplyByTwoOperator.MultiplyByTwoInput(this, 2));
+               }
+
+               private static class MultiplyByTwoInput extends 
AbstractInput<Long, Long> {
+                       MultiplyByTwoInput(
+                                       AbstractStreamOperatorV2<Long> owner,
+                                       int inputId) {
+                               super(owner, inputId);
+                       }
+
+                       @Override
+                       public void processElement(StreamRecord<Long> element) {
+                               
output.collect(element.replace(element.getValue() * 2));
+                       }
+               }
+       }
+}

Reply via email to