This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b402108f9bc [FLINK-32506][connectors/common] Add the benchmark for watermark aggregation when enabling the watermark alignment b402108f9bc is described below commit b402108f9bc468ed5223a5d73ea0bfcfa0085cfe Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Sat Jul 1 00:32:58 2023 +0800 [FLINK-32506][connectors/common] Add the benchmark for watermark aggregation when enabling the watermark alignment --- .../source/coordinator/CoordinatorTestUtils.java | 16 ++++ .../SourceCoordinatorAlignmentBenchmark.java | 93 ++++++++++++++++++++++ .../SourceCoordinatorAlignmentBenchmarkTest.java | 33 ++++++++ .../SourceCoordinatorAlignmentTest.java | 2 +- .../coordinator/SourceCoordinatorTestBase.java | 18 +---- 5 files changed, 144 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java index 6811ae712a4..c1d0d490764 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingRunnable; import java.util.ArrayList; @@ -27,6 +28,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -66,4 +69,17 @@ class CoordinatorTestUtils { ThrowingRunnable<Throwable> runnable, String failureMessage, String errorMessage) { assertThatThrownBy(runnable::run, failureMessage).hasStackTraceContaining(errorMessage); } + + static void waitForCoordinatorToProcessActions(SourceCoordinatorContext<?> context) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + context.runInCoordinatorThread(() -> future.complete(null)); + + try { + future.get(); + } catch (InterruptedException e) { + throw new AssertionError("test interrupted"); + } catch (ExecutionException e) { + ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e)); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmark.java new file mode 100644 index 00000000000..2986f0a9878 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmark.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.coordinator; + +import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.mocks.MockSource; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.source.event.ReportedWatermarkEvent; + +import java.util.Random; + +import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.waitForCoordinatorToProcessActions; + +/** The benchmark of watermark alignment. */ +public class SourceCoordinatorAlignmentBenchmark { + + private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L); + + private SourceCoordinator<?, ?> sourceCoordinator; + + private int numSubtasks; + + private long second; + + private long[] randomMilliseconds; + + public SourceCoordinatorAlignmentBenchmark() {} + + public void setup(int numSubtasks) throws Exception { + SourceCoordinatorProvider<MockSourceSplit> provider = + new SourceCoordinatorProvider<>( + "SourceCoordinatorProviderTest", + OPERATOR_ID, + new MockSource(Boundedness.BOUNDED, 2), + 1, + new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), + null); + this.sourceCoordinator = + (SourceCoordinator<?, ?>) + provider.getCoordinator( + new MockOperatorCoordinatorContext(OPERATOR_ID, numSubtasks)); + this.sourceCoordinator.start(); + this.numSubtasks = numSubtasks; + this.second = 0; + this.randomMilliseconds = generateRandomMilliseconds(numSubtasks); + + // Initialize the watermark for all subtasks. + sendReportedWatermarkToAllSubtasks(); + } + + public void teardown() throws Exception { + sourceCoordinator.close(); + } + + public void sendReportedWatermarkToAllSubtasks() { + for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) { + sourceCoordinator.handleEventFromOperator( + subtaskIndex, + 0, + new ReportedWatermarkEvent(second + randomMilliseconds[subtaskIndex])); + } + waitForCoordinatorToProcessActions(sourceCoordinator.getContext()); + second += 100_000; + } + + private long[] generateRandomMilliseconds(int numSubtasks) { + Random random = new Random(); + long[] randomMilliseconds = new long[numSubtasks]; + for (int subtaskIndex = 0; subtaskIndex < numSubtasks; subtaskIndex++) { + randomMilliseconds[subtaskIndex] = random.nextInt(1000); + } + return randomMilliseconds; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmarkTest.java new file mode 100644 index 00000000000..79b7d215ddc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentBenchmarkTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.coordinator; + +import org.junit.jupiter.api.Test; + +/** The test for {@link SourceCoordinatorAlignmentBenchmark}. */ +class SourceCoordinatorAlignmentBenchmarkTest { + + @Test + void testWatermarkAggregation() throws Exception { + SourceCoordinatorAlignmentBenchmark benchmark = new SourceCoordinatorAlignmentBenchmark(); + benchmark.setup(5000); + benchmark.sendReportedWatermarkToAllSubtasks(); + benchmark.teardown(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java index 2bc15fc336a..b73586c9196 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java @@ -215,7 +215,7 @@ class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase { SourceCoordinator<?, ?> sourceCoordinator1, int subtask, long watermark) { sourceCoordinator1.handleEventFromOperator( subtask, 0, new ReportedWatermarkEvent(watermark)); - waitForCoordinatorToProcessActions(sourceCoordinator1.getContext()); + CoordinatorTestUtils.waitForCoordinatorToProcessActions(sourceCoordinator1.getContext()); sourceCoordinator1.announceCombinedWatermark(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index 8fba35df0d3..930923799b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCo import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.AfterEach; @@ -43,8 +42,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.function.Supplier; @@ -148,20 +145,7 @@ abstract class SourceCoordinatorTestBase { } protected void waitForCoordinatorToProcessActions() { - waitForCoordinatorToProcessActions(context); - } - - protected void waitForCoordinatorToProcessActions(SourceCoordinatorContext<?> context) { - final CompletableFuture<Void> future = new CompletableFuture<>(); - context.runInCoordinatorThread(() -> future.complete(null)); - - try { - future.get(); - } catch (InterruptedException e) { - throw new AssertionError("test interrupted"); - } catch (ExecutionException e) { - ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e)); - } + CoordinatorTestUtils.waitForCoordinatorToProcessActions(context); } void waitForSentEvents(int expectedEventNumber) throws Exception {