This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b402108f9bc [FLINK-32506][connectors/common] Add the benchmark for 
watermark aggregation when enabling the watermark alignment
b402108f9bc is described below

commit b402108f9bc468ed5223a5d73ea0bfcfa0085cfe
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Sat Jul 1 00:32:58 2023 +0800

    [FLINK-32506][connectors/common] Add the benchmark for watermark 
aggregation when enabling the watermark alignment
---
 .../source/coordinator/CoordinatorTestUtils.java   | 16 ++++
 .../SourceCoordinatorAlignmentBenchmark.java       | 93 ++++++++++++++++++++++
 .../SourceCoordinatorAlignmentBenchmarkTest.java   | 33 ++++++++
 .../SourceCoordinatorAlignmentTest.java            |  2 +-
 .../coordinator/SourceCoordinatorTestBase.java     | 18 +----
 5 files changed, 144 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
index 6811ae712a4..c1d0d490764 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import java.util.ArrayList;
@@ -27,6 +28,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -66,4 +69,17 @@ class CoordinatorTestUtils {
             ThrowingRunnable<Throwable> runnable, String failureMessage, 
String errorMessage) {
         assertThatThrownBy(runnable::run, 
failureMessage).hasStackTraceContaining(errorMessage);
     }
+
+    static void waitForCoordinatorToProcessActions(SourceCoordinatorContext<?> 
context) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        context.runInCoordinatorThread(() -> future.complete(null));
+
+        try {
+            future.get();
+        } catch (InterruptedException e) {
+            throw new AssertionError("test interrupted");
+        } catch (ExecutionException e) {
+            ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e));
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmark.java
new file mode 100644
index 00000000000..2986f0a9878
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmark.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
+
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.waitForCoordinatorToProcessActions;
+
+/** The benchmark of watermark alignment. */
+public class SourceCoordinatorAlignmentBenchmark {
+
+    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
+
+    private SourceCoordinator<?, ?> sourceCoordinator;
+
+    private int numSubtasks;
+
+    private long second;
+
+    private long[] randomMilliseconds;
+
+    public SourceCoordinatorAlignmentBenchmark() {}
+
+    public void setup(int numSubtasks) throws Exception {
+        SourceCoordinatorProvider<MockSourceSplit> provider =
+                new SourceCoordinatorProvider<>(
+                        "SourceCoordinatorProviderTest",
+                        OPERATOR_ID,
+                        new MockSource(Boundedness.BOUNDED, 2),
+                        1,
+                        new WatermarkAlignmentParams(1000L, "group1", 
Long.MAX_VALUE),
+                        null);
+        this.sourceCoordinator =
+                (SourceCoordinator<?, ?>)
+                        provider.getCoordinator(
+                                new 
MockOperatorCoordinatorContext(OPERATOR_ID, numSubtasks));
+        this.sourceCoordinator.start();
+        this.numSubtasks = numSubtasks;
+        this.second = 0;
+        this.randomMilliseconds = generateRandomMilliseconds(numSubtasks);
+
+        // Initialize the watermark for all subtasks.
+        sendReportedWatermarkToAllSubtasks();
+    }
+
+    public void teardown() throws Exception {
+        sourceCoordinator.close();
+    }
+
+    public void sendReportedWatermarkToAllSubtasks() {
+        for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) 
{
+            sourceCoordinator.handleEventFromOperator(
+                    subtaskIndex,
+                    0,
+                    new ReportedWatermarkEvent(second + 
randomMilliseconds[subtaskIndex]));
+        }
+        waitForCoordinatorToProcessActions(sourceCoordinator.getContext());
+        second += 100_000;
+    }
+
+    private long[] generateRandomMilliseconds(int numSubtasks) {
+        Random random = new Random();
+        long[] randomMilliseconds = new long[numSubtasks];
+        for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) 
{
+            randomMilliseconds[subtaskIndex] = random.nextInt(1000);
+        }
+        return randomMilliseconds;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmarkTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmarkTest.java
new file mode 100644
index 00000000000..79b7d215ddc
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmarkTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.junit.jupiter.api.Test;
+
+/** The test for {@link SourceCoordinatorAlignmentBenchmark}. */
+class SourceCoordinatorAlignmentBenchmarkTest {
+
+    @Test
+    void testWatermarkAggregation() throws Exception {
+        SourceCoordinatorAlignmentBenchmark benchmark = new 
SourceCoordinatorAlignmentBenchmark();
+        benchmark.setup(5000);
+        benchmark.sendReportedWatermarkToAllSubtasks();
+        benchmark.teardown();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
index 2bc15fc336a..b73586c9196 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
@@ -215,7 +215,7 @@ class SourceCoordinatorAlignmentTest extends 
SourceCoordinatorTestBase {
             SourceCoordinator<?, ?> sourceCoordinator1, int subtask, long 
watermark) {
         sourceCoordinator1.handleEventFromOperator(
                 subtask, 0, new ReportedWatermarkEvent(watermark));
-        waitForCoordinatorToProcessActions(sourceCoordinator1.getContext());
+        
CoordinatorTestUtils.waitForCoordinatorToProcessActions(sourceCoordinator1.getContext());
         sourceCoordinator1.announceCombinedWatermark();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 8fba35df0d3..930923799b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCo
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.junit.jupiter.api.AfterEach;
@@ -43,8 +42,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.function.Supplier;
 
@@ -148,20 +145,7 @@ abstract class SourceCoordinatorTestBase {
     }
 
     protected void waitForCoordinatorToProcessActions() {
-        waitForCoordinatorToProcessActions(context);
-    }
-
-    protected void 
waitForCoordinatorToProcessActions(SourceCoordinatorContext<?> context) {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        context.runInCoordinatorThread(() -> future.complete(null));
-
-        try {
-            future.get();
-        } catch (InterruptedException e) {
-            throw new AssertionError("test interrupted");
-        } catch (ExecutionException e) {
-            ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e));
-        }
+        CoordinatorTestUtils.waitForCoordinatorToProcessActions(context);
     }
 
     void waitForSentEvents(int expectedEventNumber) throws Exception {

Reply via email to