This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed195cfedbbcee45b9eee33b054a209b20f09b39 Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Thu Apr 20 14:42:15 2023 +0200 [hotfix][test] Adds generic testing implementations for LeaderContender and LeaderElectionDriver Signed-off-by: Matthias Pohl <matthias.p...@aiven.io> --- .../TestingGenericLeaderContender.java | 116 +++++++++++++++++++++ .../TestingGenericLeaderElectionDriver.java | 94 +++++++++++++++++ 2 files changed, 210 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java new file mode 100644 index 00000000000..c385cb20c2a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * {@code TestingGenericLeaderContender} is a more generic testing implementation of the {@link + * LeaderContender} interface. + */ +public class TestingGenericLeaderContender implements LeaderContender { + + private final Object lock = new Object(); + + private final Consumer<UUID> grantLeadershipConsumer; + private final Runnable revokeLeadershipRunnable; + private final Consumer<Exception> handleErrorConsumer; + private final Supplier<String> getDescriptionSupplier; + + private TestingGenericLeaderContender( + Consumer<UUID> grantLeadershipConsumer, + Runnable revokeLeadershipRunnable, + Consumer<Exception> handleErrorConsumer, + Supplier<String> getDescriptionSupplier) { + this.grantLeadershipConsumer = grantLeadershipConsumer; + this.revokeLeadershipRunnable = revokeLeadershipRunnable; + this.handleErrorConsumer = handleErrorConsumer; + this.getDescriptionSupplier = getDescriptionSupplier; + } + + @Override + public void grantLeadership(UUID leaderSessionID) { + synchronized (lock) { + grantLeadershipConsumer.accept(leaderSessionID); + } + } + + @Override + public void revokeLeadership() { + synchronized (lock) { + revokeLeadershipRunnable.run(); + } + } + + @Override + public void handleError(Exception exception) { + synchronized (lock) { + handleErrorConsumer.accept(exception); + } + } + + @Override + public String getDescription() { + return getDescriptionSupplier.get(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** {@code Builder} for creating {@code TestingGenericLeaderContender} instances. */ + public static class Builder { + private Consumer<UUID> grantLeadershipConsumer = ignoredSessionID -> {}; + private Runnable revokeLeadershipRunnable = () -> {}; + private Consumer<Exception> handleErrorConsumer = ignoredError -> {}; + private Supplier<String> getDescriptionSupplier = () -> "testing contender"; + + private Builder() {} + + public Builder setGrantLeadershipConsumer(Consumer<UUID> grantLeadershipConsumer) { + this.grantLeadershipConsumer = grantLeadershipConsumer; + return this; + } + + public Builder setRevokeLeadershipRunnable(Runnable revokeLeadershipRunnable) { + this.revokeLeadershipRunnable = revokeLeadershipRunnable; + return this; + } + + public Builder setHandleErrorConsumer(Consumer<Exception> handleErrorConsumer) { + this.handleErrorConsumer = handleErrorConsumer; + return this; + } + + public Builder setGetDescriptionSupplier(Supplier<String> getDescriptionSupplier) { + this.getDescriptionSupplier = getDescriptionSupplier; + return this; + } + + public TestingGenericLeaderContender build() { + return new TestingGenericLeaderContender( + grantLeadershipConsumer, + revokeLeadershipRunnable, + handleErrorConsumer, + getDescriptionSupplier); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java new file mode 100644 index 00000000000..ba99288968b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.function.ThrowingRunnable; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * {@code TestingGenericLeaderElectionDriver} is test implementation of {@link LeaderElectionDriver} + * to support test cases in the most generic way. + */ +public class TestingGenericLeaderElectionDriver implements LeaderElectionDriver { + + private final Consumer<LeaderInformation> writeLeaderInformationConsumer; + private final Supplier<Boolean> hasLeadershipSupplier; + private final ThrowingRunnable<Exception> closeRunnable; + + private TestingGenericLeaderElectionDriver( + Consumer<LeaderInformation> writeLeaderInformationConsumer, + Supplier<Boolean> hasLeadershipSupplier, + ThrowingRunnable<Exception> closeRunnable) { + this.writeLeaderInformationConsumer = writeLeaderInformationConsumer; + this.hasLeadershipSupplier = hasLeadershipSupplier; + this.closeRunnable = closeRunnable; + } + + @Override + public void writeLeaderInformation(LeaderInformation leaderInformation) { + writeLeaderInformationConsumer.accept(leaderInformation); + } + + @Override + public boolean hasLeadership() { + return hasLeadershipSupplier.get(); + } + + @Override + public void close() throws Exception { + closeRunnable.run(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** {@code Builder} for creating {@code TestingGenericLeaderContender} instances. */ + public static class Builder { + private Consumer<LeaderInformation> writeLeaderInformationConsumer = + ignoredLeaderInformation -> {}; + private Supplier<Boolean> hasLeadershipSupplier = () -> false; + private ThrowingRunnable<Exception> closeRunnable = () -> {}; + + private Builder() {} + + public Builder setWriteLeaderInformationConsumer( + Consumer<LeaderInformation> writeLeaderInformationConsumer) { + this.writeLeaderInformationConsumer = writeLeaderInformationConsumer; + return this; + } + + public Builder setHasLeadershipSupplier(Supplier<Boolean> hasLeadershipSupplier) { + this.hasLeadershipSupplier = hasLeadershipSupplier; + return this; + } + + public Builder setCloseRunnable(ThrowingRunnable<Exception> closeRunnable) { + this.closeRunnable = closeRunnable; + return this; + } + + public TestingGenericLeaderElectionDriver build() { + return new TestingGenericLeaderElectionDriver( + writeLeaderInformationConsumer, hasLeadershipSupplier, closeRunnable); + } + } +}