This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new 517f1e7  [FLINK-24300] Added benchmarks for idling chained sources
517f1e7 is described below

commit 517f1e7301b6a7086a3d8b70067da88ba6294022
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Thu Sep 16 20:16:20 2021 +0200

    [FLINK-24300] Added benchmarks for idling chained sources
    
    This closes #31
---
 pom.xml                                            |   6 ++
 .../flink/benchmark/MultipleInputBenchmark.java    | 106 +++++++++++++++++++--
 2 files changed, 105 insertions(+), 7 deletions(-)

diff --git a/pom.xml b/pom.xml
index f0f0bda..f92f166 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,6 +201,12 @@ under the License.
                        <type>jar</type>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+               </dependency>
 
                <!-- serialization benchmark requirements -->
 
diff --git 
a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
index f070c59..53c6115 100644
--- a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
@@ -18,14 +18,27 @@
 
 package org.apache.flink.benchmark;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.benchmark.functions.LongSource;
 import org.apache.flink.benchmark.functions.QueuingLongSource;
 import org.apache.flink.benchmark.operators.MultiplyByTwoOperatorFactory;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
 
 import org.openjdk.jmh.annotations.Benchmark;
@@ -36,18 +49,19 @@ import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.VerboseMode;
 
-public class MultipleInputBenchmark extends BenchmarkBase {
+import java.util.concurrent.CompletableFuture;
 
+public class MultipleInputBenchmark extends BenchmarkBase {
        public static final int RECORDS_PER_INVOCATION = 
TwoInputBenchmark.RECORDS_PER_INVOCATION;
        public static final int ONE_IDLE_RECORDS_PER_INVOCATION = 
TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION;
        public static final long CHECKPOINT_INTERVAL_MS = 
TwoInputBenchmark.CHECKPOINT_INTERVAL_MS;
 
        public static void main(String[] args)
-               throws RunnerException {
+                       throws RunnerException {
                Options options = new OptionsBuilder()
-                       .verbosity(VerboseMode.NORMAL)
-                       .include(".*" + 
MultipleInputBenchmark.class.getSimpleName() + ".*")
-                       .build();
+                               .verbosity(VerboseMode.NORMAL)
+                               .include(".*" + 
MultipleInputBenchmark.class.getSimpleName() + ".*")
+                               .build();
 
                new Runner(options).run();
        }
@@ -82,10 +96,88 @@ public class MultipleInputBenchmark extends BenchmarkBase {
                env.execute();
        }
 
+       @Benchmark
+       @OperationsPerInvocation(RECORDS_PER_INVOCATION)
+       public void multiInputChainedIdleSource(FlinkEnvironmentContext 
context) throws Exception {
+               final StreamExecutionEnvironment env = context.env;
+               env.getConfig().enableObjectReuse();
+
+               final DataStream<Long> source1 =
+                               env.fromSource(
+                                               new NumberSequenceSource(1L, 
RECORDS_PER_INVOCATION),
+                                               
WatermarkStrategy.noWatermarks(),
+                                               "source-1");
+
+               final DataStreamSource<Integer> source2 =
+                               env.fromSource(new IdlingSource(1), 
WatermarkStrategy.noWatermarks(), "source-2");
+
+               MultipleInputTransformation<Long> transform = new 
MultipleInputTransformation<>(
+                               "custom operator",
+                               new MultiplyByTwoOperatorFactory(),
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               1);
+
+               transform.addInput(((DataStream<?>) 
source1).getTransformation());
+               transform.addInput(((DataStream<?>) 
source2).getTransformation());
+               
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+               env.addOperator(transform);
+               new 
MultipleConnectedStreams(env).transform(transform).addSink(new 
SinkClosingIdlingSource()).setParallelism(1);
+               context.execute();
+       }
+
+       private static class IdlingSource extends MockSource {
+               private static CompletableFuture<Void> canFinish = new 
CompletableFuture<>();
+
+               public static void signalCanFinish() {
+                       canFinish.complete(null);
+               }
+
+               public static void reset() {
+                       canFinish.completeExceptionally(new 
IllegalStateException("State has been reset"));
+                       canFinish = new CompletableFuture<>();
+               }
+
+               public IdlingSource(int numSplits) {
+                       super(Boundedness.BOUNDED, numSplits, true, true);
+               }
+
+               @Override
+               public SourceReader<Integer, MockSourceSplit> createReader(
+                               SourceReaderContext readerContext) {
+                       return new MockSourceReader(true, true) {
+                               @Override
+                               public InputStatus 
pollNext(ReaderOutput<Integer> sourceOutput) {
+                                       if (canFinish.isDone() && 
!canFinish.isCompletedExceptionally()) {
+                                               return InputStatus.END_OF_INPUT;
+                                       } else {
+                                               return 
InputStatus.NOTHING_AVAILABLE;
+                                       }
+                               }
+
+                               @Override
+                               public synchronized CompletableFuture<Void> 
isAvailable() {
+                                       return canFinish;
+                               }
+                       };
+               }
+       }
+
+       private static class SinkClosingIdlingSource implements 
SinkFunction<Long> {
+               private int recordsSoFar = 0;
+
+               @Override
+               public void invoke(Long value) {
+                       if (++recordsSoFar >= RECORDS_PER_INVOCATION) {
+                               IdlingSource.signalCanFinish();
+                       }
+               }
+       }
+
        private static void connectAndDiscard(
                        StreamExecutionEnvironment env,
-                       DataStreamSource<Long> source1,
-                       DataStreamSource<Long> source2) {
+                       DataStream<?> source1,
+                       DataStream<?> source2) {
                MultipleInputTransformation<Long> transform = new 
MultipleInputTransformation<>(
                                "custom operator",
                                new MultiplyByTwoOperatorFactory(),

Reply via email to