This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 62b792d58c4 [FLINK-31781][runtime] Introduces contender ID in LeaderElectionService interface 62b792d58c4 is described below commit 62b792d58c4f8d5b494b50daad2e5fc5047af330 Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Tue May 16 18:42:49 2023 +0200 [FLINK-31781][runtime] Introduces contender ID in LeaderElectionService interface Signed-off-by: Matthias Pohl <matthias.p...@aiven.io> --- .../highavailability/AbstractHaServices.java | 7 +- .../AbstractLeaderElectionService.java | 36 +++--- .../leaderelection/DefaultLeaderElection.java | 23 ++-- .../DefaultLeaderElectionService.java | 60 ++++++--- .../leaderelection/LeaderElectionService.java | 5 +- .../JobMasterServiceLeadershipRunnerTest.java | 4 +- .../DefaultLeaderElectionServiceTest.java | 105 +++++++++++----- .../leaderelection/DefaultLeaderElectionTest.java | 140 ++++++++++++--------- .../runtime/leaderelection/LeaderElectionTest.java | 2 +- ...KeeperLeaderElectionConnectionHandlingTest.java | 3 +- .../ZooKeeperLeaderElectionTest.java | 9 +- 11 files changed, 247 insertions(+), 147 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 4c24110c42a..401e63cc959 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -247,7 +247,12 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { leaderElectionService.startLeaderElectionBackend(); closeableRegistry.registerCloseable(leaderElectionService); - return leaderElectionService.createLeaderElection(); + // the leaderName which is passed as a contenderID here is not actively used within the + // DefaultLeaderElectionService for now - this will change in a future step where the + // DefaultLeaderElectionService will start to use MultipleComponentLeaderElectionDriver + // instead of LeaderElectionDriver and fully replace + // DefaultMultipleComponentLeaderElectionService (FLINK-31783) + return leaderElectionService.createLeaderElection("unused-contender-id"); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java index 0abad419329..eef2d731fa7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java @@ -27,29 +27,37 @@ import java.util.UUID; public abstract class AbstractLeaderElectionService implements LeaderElectionService { @Override - public LeaderElection createLeaderElection() { - return new DefaultLeaderElection(this); + public LeaderElection createLeaderElection(String contenderID) { + return new DefaultLeaderElection(this, contenderID); } /** - * Registers the given {@link LeaderContender} with the underlying {@code - * LeaderElectionService}. Leadership changes are starting to be reported to the {@code + * Registers the {@link LeaderContender} under the {@code contenderID} with the underlying + * {@code LeaderElectionService}. Leadership changes are starting to be reported to the {@code * LeaderContender}. */ - protected abstract void register(LeaderContender contender) throws Exception; + protected abstract void register(String contenderID, LeaderContender contender) + throws Exception; - /** Removes the passed {@code LeaderContender} from the {@code LeaderElectionService}. */ - protected abstract void remove(LeaderContender contender); + /** + * Removes the {@code LeaderContender} from the {@code LeaderElectionService} that is associated + * with the {@code contenderID}. + */ + protected abstract void remove(String contenderID); - /** Confirms the leadership with the given session ID and address. */ - protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress); + /** + * Confirms the leadership with the {@code leaderSessionID} and {@code leaderAddress} for the + * {@link LeaderContender} that is associated with the {@code contenderID}. + */ + protected abstract void confirmLeadership( + String contenderID, UUID leaderSessionID, String leaderAddress); /** - * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given - * session ID. + * Checks whether the {@code LeaderElectionService} has the leadership acquired for the {@code + * contenderID} and {@code leaderSessionID}. * - * @return {@code true} if the service has leadership with the passed session ID acquired; - * {@code false} otherwise. + * @return {@code true} if the service has leadership with the passed {@code leaderSessionID} + * acquired; {@code false} otherwise. */ - protected abstract boolean hasLeadership(UUID leaderSessionId); + protected abstract boolean hasLeadership(String contenderID, UUID leaderSessionID); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java index e5cc3dbe550..da49687459e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java @@ -20,8 +20,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.util.Preconditions; -import javax.annotation.Nullable; - import java.util.UUID; /** @@ -31,36 +29,31 @@ import java.util.UUID; class DefaultLeaderElection implements LeaderElection { private final AbstractLeaderElectionService parentService; - @Nullable private LeaderContender leaderContender; + private final String contenderID; - DefaultLeaderElection(AbstractLeaderElectionService parentService) { + DefaultLeaderElection(AbstractLeaderElectionService parentService, String contenderID) { this.parentService = parentService; + this.contenderID = contenderID; } @Override public void startLeaderElection(LeaderContender contender) throws Exception { - Preconditions.checkState( - leaderContender == null, "There shouldn't be any LeaderContender registered, yet."); - leaderContender = Preconditions.checkNotNull(contender); - - parentService.register(leaderContender); + Preconditions.checkNotNull(contender); + parentService.register(contenderID, contender); } @Override public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { - parentService.confirmLeadership(leaderSessionID, leaderAddress); + parentService.confirmLeadership(contenderID, leaderSessionID, leaderAddress); } @Override public boolean hasLeadership(UUID leaderSessionId) { - return parentService.hasLeadership(leaderSessionId); + return parentService.hasLeadership(contenderID, leaderSessionId); } @Override public void close() throws Exception { - if (leaderContender != null) { - parentService.remove(leaderContender); - leaderContender = null; - } + parentService.remove(contenderID); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index 2487bd15ad7..9c6f1d72242 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -55,14 +55,18 @@ public class DefaultLeaderElectionService extends AbstractLeaderElectionService private final LeaderElectionDriverFactory leaderElectionDriverFactory; /** - * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is - * registered that participates in the leader election, yet. See {@link - * #register(LeaderContender)} and {@link #remove(LeaderContender)} for lifecycle management. + * {@code contenderID} being {@code null} indicates that no {@link LeaderContender} is + * registered that participates in the leader election, yet. See {@link #register(String, + * LeaderContender)} and {@link #remove(String)} for lifecycle management. * * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class * in a supporting IDE. */ @GuardedBy("lock") + private String contenderID; + + /** {@code leaderContender} is closely linked to the {@link #contenderID}. */ + @GuardedBy("lock") private LeaderContender leaderContender; /** @@ -161,18 +165,22 @@ public class DefaultLeaderElectionService extends AbstractLeaderElectionService } @Override - protected void register(LeaderContender contender) throws Exception { + protected void register(String contenderID, LeaderContender contender) throws Exception { + checkNotNull(contenderID, "ContenderID must not be null."); checkNotNull(contender, "Contender must not be null."); synchronized (lock) { Preconditions.checkState( leaderContender == null, "Only one LeaderContender is allowed to be registered to this service."); + Preconditions.checkState( + this.contenderID == null, "The contenderID is only allowed to be set once."); Preconditions.checkState( running, "The DefaultLeaderElectionService should have established a connection to the backend before it's started."); leaderContender = contender; + this.contenderID = contenderID; LOG.info( "LeaderContender {} has been registered for {}.", @@ -188,17 +196,20 @@ public class DefaultLeaderElectionService extends AbstractLeaderElectionService } @Override - protected final void remove(LeaderContender contender) { - Preconditions.checkArgument(contender == this.leaderContender); - LOG.info("Stopping DefaultLeaderElectionService."); - + protected final void remove(String contenderID) { synchronized (lock) { - if (leaderContender == null) { + if (this.contenderID == null) { LOG.debug( "The stop procedure was called on an already stopped DefaultLeaderElectionService instance. No action necessary."); return; } + LOG.info("Stopping DefaultLeaderElectionService for {}.", this.contenderID); + + Preconditions.checkNotNull( + leaderContender, + "There should be a LeaderContender registered under the given contenderID '%s'.", + this.contenderID); if (issuedLeaderSessionID != null) { notifyLeaderContenderOfLeadershipLoss(); LOG.debug( @@ -217,6 +228,7 @@ public class DefaultLeaderElectionService extends AbstractLeaderElectionService "DefaultLeaderElectionService is stopping while not having the leadership acquired. No cleanup necessary."); } + this.contenderID = null; leaderContender = null; } } @@ -253,13 +265,19 @@ public class DefaultLeaderElectionService extends AbstractLeaderElectionService } @Override - protected void confirmLeadership(UUID leaderSessionID, String leaderAddress) { - LOG.debug("Confirm leader session ID {} for leader {}.", leaderSessionID, leaderAddress); + protected void confirmLeadership( + String contenderID, UUID leaderSessionID, String leaderAddress) { + Preconditions.checkArgument(contenderID.equals(this.contenderID)); + LOG.debug( + "The leader session of {} is confirmed with session ID {} for and address {}.", + this.contenderID, + leaderSessionID, + leaderAddress); checkNotNull(leaderSessionID); synchronized (lock) { - if (hasLeadership(leaderSessionID)) { + if (hasLeadership(contenderID, leaderSessionID)) { Preconditions.checkState( confirmedLeaderInformation.isEmpty(), "No confirmation should have happened, yet."); @@ -284,15 +302,16 @@ public class DefaultLeaderElectionService extends AbstractLeaderElectionService } @Override - protected boolean hasLeadership(UUID leaderSessionId) { + protected boolean hasLeadership(String contenderID, UUID leaderSessionId) { synchronized (lock) { if (leaderElectionDriver != null) { - if (leaderContender != null) { + if (contenderID.equals(this.contenderID)) { return leaderElectionDriver.hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID); } else { LOG.debug( - "hasLeadership is called after the service is stopped, returning false."); + "hasLeadership is called for contender ID '{}' while there is no contender registered under that ID in service, returning false.", + contenderID); return false; } } else { @@ -302,12 +321,17 @@ public class DefaultLeaderElectionService extends AbstractLeaderElectionService } } - /** Returns the current leader session ID or {@code null}, if the session wasn't confirmed. */ + /** + * Returns the current leader session ID for the given {@code contenderID} or {@code null}, if + * the session wasn't confirmed. + */ @VisibleForTesting @Nullable - public UUID getLeaderSessionID() { + public UUID getLeaderSessionID(String contenderID) { synchronized (lock) { - return confirmedLeaderInformation.getLeaderSessionID(); + return contenderID.equals(this.contenderID) + ? confirmedLeaderInformation.getLeaderSessionID() + : null; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java index c83625a7db0..88b0b0460ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java @@ -38,6 +38,9 @@ public interface LeaderElectionService { /** * Creates a new {@link LeaderElection} instance that is registered to this {@code * LeaderElectionService} instance. + * + * @param contenderID a unique identifier that refers to the stored leader information that the + * newly created {@link LeaderElection} manages. */ - LeaderElection createLeaderElection(); + LeaderElection createLeaderElection(String contenderID); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index 9b2a2e6af6d..a4df85e5f3c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -752,7 +752,9 @@ class JobMasterServiceLeadershipRunnerTest { .setJobMasterServiceProcessFunction( ignoredSessionId -> jobMasterServiceProcess) .build()) - .setLeaderElection(defaultLeaderElectionService.createLeaderElection()) + .setLeaderElection( + defaultLeaderElectionService.createLeaderElection( + "random-contender-id")) .build()) { jobManagerRunner.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java index dcd12281654..a95fe2df66d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java @@ -54,11 +54,13 @@ class DefaultLeaderElectionServiceTest { testingContender.waitForLeader(); assertThat(testingContender.getDescription()).isEqualTo(TEST_URL); assertThat(testingContender.getLeaderSessionID()) - .isEqualTo(leaderElectionService.getLeaderSessionID()); + .isEqualTo( + leaderElectionService.getLeaderSessionID(contenderID)); final LeaderInformation expectedLeaderInformationInHaBackend = LeaderInformation.known( - leaderElectionService.getLeaderSessionID(), TEST_URL); + leaderElectionService.getLeaderSessionID(contenderID), + TEST_URL); assertThat(testingLeaderElectionDriver.getLeaderInformation()) .as( "The HA backend should have its leader information updated.") @@ -68,7 +70,8 @@ class DefaultLeaderElectionServiceTest { testingLeaderElectionDriver.notLeader(); testingContender.waitForRevokeLeader(); assertThat(testingContender.getLeaderSessionID()).isNull(); - assertThat(leaderElectionService.getLeaderSessionID()).isNull(); + assertThat(leaderElectionService.getLeaderSessionID(contenderID)) + .isNull(); assertThat(testingLeaderElectionDriver.getLeaderInformation()) .as( "External storage is not touched by the leader session because the leadership is already lost.") @@ -156,7 +159,8 @@ class DefaultLeaderElectionServiceTest { Executors.newDirectExecutorService())) { testInstance.startLeaderElectionBackend(); - final LeaderElection leaderElection = testInstance.createLeaderElection(); + final LeaderElection leaderElection = + testInstance.createLeaderElection(createRandomContenderID()); final TestingContender testingContender = new TestingContender("unused-address", leaderElection); testingContender.startLeaderElection(); @@ -181,7 +185,7 @@ class DefaultLeaderElectionServiceTest { testingLeaderElectionDriver.isLeader(expectedSessionID); try (LeaderElection anotherLeaderElection = - leaderElectionService.createLeaderElection()) { + leaderElectionService.createLeaderElection(contenderID)) { final TestingContender testingContender = new TestingContender(TEST_URL, anotherLeaderElection); testingContender.startLeaderElection(); @@ -219,7 +223,8 @@ class DefaultLeaderElectionServiceTest { testingLeaderElectionDriver.isLeader(expectedSessionID); executorService.trigger(); - leaderElection = leaderElectionService.createLeaderElection(); + leaderElection = + leaderElectionService.createLeaderElection(contenderID); final TestingContender contender = new TestingContender("unused-address", leaderElection); contender.startLeaderElection(); @@ -251,7 +256,8 @@ class DefaultLeaderElectionServiceTest { driver.isLeader(); - final LeaderElection leaderElection = testInstance.createLeaderElection(); + final LeaderElection leaderElection = + testInstance.createLeaderElection(createRandomContenderID()); final TestingContender contender = new TestingContender("unused-address", leaderElection); contender.startLeaderElection(); @@ -278,7 +284,8 @@ class DefaultLeaderElectionServiceTest { driver.isLeader(); - final LeaderElection leaderElection = testInstance.createLeaderElection(); + final LeaderElection leaderElection = + testInstance.createLeaderElection(createRandomContenderID()); leaderElection.startLeaderElection(TestingGenericLeaderContender.newBuilder().build()); leaderElection.close(); @@ -290,7 +297,8 @@ class DefaultLeaderElectionServiceTest { try (final DefaultLeaderElectionService leaderElectionService = new DefaultLeaderElectionService( new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory())) { - final LeaderElection leaderElection = leaderElectionService.createLeaderElection(); + final LeaderElection leaderElection = + leaderElectionService.createLeaderElection(createRandomContenderID()); assertThatThrownBy( () -> new TestingContender("unused-address", leaderElection) @@ -326,7 +334,8 @@ class DefaultLeaderElectionServiceTest { testingContender.waitForLeader(); assertThat(testingContender.getLeaderSessionID()).isNotNull(); - assertThat(leaderElectionService.getLeaderSessionID()).isNotNull(); + assertThat(leaderElectionService.getLeaderSessionID(contenderID)) + .isNotNull(); assertThat(testingLeaderElectionDriver.getLeaderInformation().isEmpty()) .isFalse(); @@ -336,7 +345,7 @@ class DefaultLeaderElectionServiceTest { .as( "The LeaderContender should have been informed about the leadership loss.") .isNull(); - assertThat(leaderElectionService.getLeaderSessionID()) + assertThat(leaderElectionService.getLeaderSessionID(contenderID)) .as( "The LeaderElectionService should have its internal state cleaned.") .isNull(); @@ -358,7 +367,8 @@ class DefaultLeaderElectionServiceTest { final LeaderInformation expectedLeader = LeaderInformation.known( - leaderElectionService.getLeaderSessionID(), TEST_URL); + leaderElectionService.getLeaderSessionID(contenderID), + TEST_URL); // Leader information changed on external storage. It should be // corrected. @@ -386,9 +396,13 @@ class DefaultLeaderElectionServiceTest { testingLeaderElectionDriver.isLeader(expectedSessionID); - assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + assertThat( + leaderElectionService.hasLeadership( + contenderID, expectedSessionID)) .isFalse(); - assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) + assertThat( + leaderElectionService.hasLeadership( + contenderID, UUID.randomUUID())) .isFalse(); }); } @@ -406,9 +420,13 @@ class DefaultLeaderElectionServiceTest { testingLeaderElectionDriver.isLeader(expectedSessionID); executorService.trigger(); - assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + assertThat( + leaderElectionService.hasLeadership( + contenderID, expectedSessionID)) .isTrue(); - assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) + assertThat( + leaderElectionService.hasLeadership( + contenderID, UUID.randomUUID())) .isFalse(); }); } @@ -427,7 +445,9 @@ class DefaultLeaderElectionServiceTest { executorService.trigger(); testingLeaderElectionDriver.notLeader(); - assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + assertThat( + leaderElectionService.hasLeadership( + contenderID, expectedSessionID)) .as( "No operation should be handled anymore after the HA backend " + "indicated leadership loss even if the onRevokeLeadership wasn't " @@ -435,7 +455,9 @@ class DefaultLeaderElectionServiceTest { + "the leadership in the meantime already based on the HA " + "backend's decision.") .isFalse(); - assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) + assertThat( + leaderElectionService.hasLeadership( + contenderID, UUID.randomUUID())) .isFalse(); }); } @@ -456,9 +478,13 @@ class DefaultLeaderElectionServiceTest { testingLeaderElectionDriver.notLeader(); executorService.trigger(); - assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + assertThat( + leaderElectionService.hasLeadership( + contenderID, expectedSessionID)) .isFalse(); - assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) + assertThat( + leaderElectionService.hasLeadership( + contenderID, UUID.randomUUID())) .isFalse(); }); } @@ -477,7 +503,9 @@ class DefaultLeaderElectionServiceTest { leaderElection.close(); - assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + assertThat( + leaderElectionService.hasLeadership( + contenderID, expectedSessionID)) .isFalse(); }); } @@ -510,7 +538,7 @@ class DefaultLeaderElectionServiceTest { leaderElection.close(); testingLeaderElectionDriver.isLeader(); - assertThat(leaderElectionService.getLeaderSessionID()) + assertThat(leaderElectionService.getLeaderSessionID(contenderID)) .as( "The grant event shouldn't have been processed by the LeaderElectionService.") .isNull(); @@ -548,7 +576,8 @@ class DefaultLeaderElectionServiceTest { runTestWithSynchronousEventHandling( () -> { testingLeaderElectionDriver.isLeader(); - final UUID oldSessionId = leaderElectionService.getLeaderSessionID(); + final UUID oldSessionId = + leaderElectionService.getLeaderSessionID(contenderID); assertThat(testingContender.getLeaderSessionID()) .isEqualTo(oldSessionId); @@ -571,12 +600,12 @@ class DefaultLeaderElectionServiceTest { () -> { testingLeaderElectionDriver.isLeader(); final UUID currentLeaderSessionId = - leaderElectionService.getLeaderSessionID(); + leaderElectionService.getLeaderSessionID(contenderID); assertThat(currentLeaderSessionId).isNotNull(); // Old confirm call should be ignored. leaderElection.confirmLeadership(UUID.randomUUID(), TEST_URL); - assertThat(leaderElectionService.getLeaderSessionID()) + assertThat(leaderElectionService.getLeaderSessionID(contenderID)) .isEqualTo(currentLeaderSessionId); }); } @@ -591,7 +620,7 @@ class DefaultLeaderElectionServiceTest { () -> { testingLeaderElectionDriver.isLeader(); final UUID currentLeaderSessionId = - leaderElectionService.getLeaderSessionID(); + leaderElectionService.getLeaderSessionID(contenderID); assertThat(currentLeaderSessionId).isNotNull(); testingLeaderElectionDriver.notLeader(); @@ -599,7 +628,8 @@ class DefaultLeaderElectionServiceTest { // Old confirm call should be ignored. leaderElection.confirmLeadership(currentLeaderSessionId, TEST_URL); - assertThat(leaderElectionService.getLeaderSessionID()).isNull(); + assertThat(leaderElectionService.getLeaderSessionID(contenderID)) + .isNull(); }); } }; @@ -656,7 +686,8 @@ class DefaultLeaderElectionServiceTest { new DefaultLeaderElectionService(testingLeaderElectionDriverFactory); leaderElectionService.startLeaderElectionBackend(); - final LeaderElection leaderElection = leaderElectionService.createLeaderElection(); + final LeaderElection leaderElection = + leaderElectionService.createLeaderElection(createRandomContenderID()); final TestingContender testingContender = new TestingContender(TEST_URL, leaderElection); testingContender.startLeaderElection(); @@ -697,12 +728,15 @@ class DefaultLeaderElectionServiceTest { (leaderElectionEventHandler, errorHandler) -> driver); testInstance.startLeaderElectionBackend(); + final String contenderID = "contender-id"; final String address = "leader-address"; - final LeaderElection leaderElection = testInstance.createLeaderElection(); + final LeaderElection leaderElection = testInstance.createLeaderElection(contenderID); leaderElection.startLeaderElection( TestingGenericLeaderContender.newBuilder() .setGrantLeadershipConsumer( - sessionID -> testInstance.confirmLeadership(sessionID, address)) + sessionID -> + testInstance.confirmLeadership( + contenderID, sessionID, address)) .build()); // initial messages to initialize usedLeaderSessionID and confirmedLeaderInformation @@ -761,7 +795,8 @@ class DefaultLeaderElectionServiceTest { final DefaultLeaderElectionService testInstance = new DefaultLeaderElectionService(driverFactory); testInstance.startLeaderElectionBackend(); - final LeaderElection leaderElection = testInstance.createLeaderElection(); + final LeaderElection leaderElection = + testInstance.createLeaderElection(createRandomContenderID()); leaderElection.startLeaderElection(contender); final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver(); @@ -775,8 +810,14 @@ class DefaultLeaderElectionServiceTest { testInstance.close(); } + private static String createRandomContenderID() { + return String.format("contender-id-%s", UUID.randomUUID()); + } + private static class Context { + final String contenderID = createRandomContenderID(); + DefaultLeaderElectionService leaderElectionService; TestingContender testingContender; @@ -807,7 +848,7 @@ class DefaultLeaderElectionServiceTest { driverFactory, leaderEventOperationExecutor); leaderElectionService.startLeaderElectionBackend(); - leaderElection = leaderElectionService.createLeaderElection(); + leaderElection = leaderElectionService.createLeaderElection(contenderID); testingContender = new TestingContender(TEST_URL, leaderElection); testingContender.startLeaderElection(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java index 3e1de05856a..c5346a27854 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java @@ -18,34 +18,44 @@ package org.apache.flink.runtime.leaderelection; -import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.BiConsumerWithException; +import org.apache.flink.util.function.TriConsumer; import org.junit.jupiter.api.Test; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class DefaultLeaderElectionTest { + private static final String DEFAULT_TEST_CONTENDER_ID = "test-contender-id"; + @Test void testContenderRegistration() throws Exception { + final AtomicReference<String> contenderIDRef = new AtomicReference<>(); final AtomicReference<LeaderContender> contenderRef = new AtomicReference<>(); final AbstractLeaderElectionService parentService = TestingAbstractLeaderElectionService.newBuilder() - .setRegisterConsumer(contenderRef::set) + .setRegisterConsumer( + (actualContenderID, actualContender) -> { + contenderIDRef.set(actualContenderID); + contenderRef.set(actualContender); + }) .build(); - try (final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService)) { + try (final DefaultLeaderElection testInstance = + new DefaultLeaderElection(parentService, DEFAULT_TEST_CONTENDER_ID)) { final LeaderContender contender = TestingGenericLeaderContender.newBuilder().build(); testInstance.startLeaderElection(contender); + assertThat(contenderIDRef).hasValue(DEFAULT_TEST_CONTENDER_ID); assertThat(contenderRef.get()).isSameAs(contender); } } @@ -54,7 +64,8 @@ class DefaultLeaderElectionTest { void testContenderRegistrationNull() throws Exception { try (final DefaultLeaderElection testInstance = new DefaultLeaderElection( - TestingAbstractLeaderElectionService.newBuilder().build())) { + TestingAbstractLeaderElectionService.newBuilder().build(), + DEFAULT_TEST_CONTENDER_ID)) { assertThatThrownBy(() -> testInstance.startLeaderElection(null)) .isInstanceOf(NullPointerException.class); } @@ -67,11 +78,12 @@ class DefaultLeaderElectionTest { final AbstractLeaderElectionService parentService = TestingAbstractLeaderElectionService.newBuilder() .setRegisterConsumer( - contender -> { + (actualContenderID, actualContender) -> { throw expectedException; }) .build(); - try (final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService)) { + try (final DefaultLeaderElection testInstance = + new DefaultLeaderElection(parentService, DEFAULT_TEST_CONTENDER_ID)) { assertThatThrownBy( () -> testInstance.startLeaderElection( @@ -82,60 +94,64 @@ class DefaultLeaderElectionTest { @Test void testLeaderConfirmation() throws Exception { + final AtomicReference<String> contenderIDRef = new AtomicReference<>(); final AtomicReference<UUID> leaderSessionIDRef = new AtomicReference<>(); final AtomicReference<String> leaderAddressRef = new AtomicReference<>(); final AbstractLeaderElectionService parentService = TestingAbstractLeaderElectionService.newBuilder() .setConfirmLeadershipConsumer( - (leaderSessionID, address) -> { + (contenderID, leaderSessionID, address) -> { + contenderIDRef.set(contenderID); leaderSessionIDRef.set(leaderSessionID); leaderAddressRef.set(address); }) .build(); - try (final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService)) { + try (final DefaultLeaderElection testInstance = + new DefaultLeaderElection(parentService, DEFAULT_TEST_CONTENDER_ID)) { final UUID expectedLeaderSessionID = UUID.randomUUID(); final String expectedAddress = "random-address"; testInstance.confirmLeadership(expectedLeaderSessionID, expectedAddress); - assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID); - assertThat(leaderAddressRef.get()).isEqualTo(expectedAddress); + assertThat(contenderIDRef).hasValue(DEFAULT_TEST_CONTENDER_ID); + assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID); + assertThat(leaderAddressRef).hasValue(expectedAddress); } } @Test void testClose() throws Exception { - final CompletableFuture<LeaderContender> actualContender = new CompletableFuture<>(); + final CompletableFuture<String> actualContenderID = new CompletableFuture<>(); final AbstractLeaderElectionService parentService = TestingAbstractLeaderElectionService.newBuilder() - .setRegisterConsumer(ignoredContender -> {}) - .setRemoveConsumer(actualContender::complete) + .setRegisterConsumer((ignoredContenderID, ignoredContender) -> {}) + .setRemoveConsumer(actualContenderID::complete) .build(); - final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService); + final DefaultLeaderElection testInstance = + new DefaultLeaderElection(parentService, DEFAULT_TEST_CONTENDER_ID); - final LeaderContender contender = TestingGenericLeaderContender.newBuilder().build(); - testInstance.startLeaderElection(contender); + testInstance.startLeaderElection(TestingGenericLeaderContender.newBuilder().build()); testInstance.close(); - assertThat(actualContender).isCompletedWithValue(contender); + assertThat(actualContenderID).isCompletedWithValue(DEFAULT_TEST_CONTENDER_ID); } @Test void testCloseWithoutStart() throws Exception { - final CompletableFuture<LeaderContender> actualContender = new CompletableFuture<>(); + final CompletableFuture<String> actualContenderID = new CompletableFuture<>(); final AbstractLeaderElectionService parentService = TestingAbstractLeaderElectionService.newBuilder() - .setRemoveConsumer(actualContender::complete) + .setRemoveConsumer(actualContenderID::complete) .build(); - final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService); + final DefaultLeaderElection testInstance = + new DefaultLeaderElection(parentService, DEFAULT_TEST_CONTENDER_ID); testInstance.close(); - assertThat(actualContender) - .as( - "No removal should be triggered if there's no contender that need to be deregistered.") - .isNotDone(); + assertThatFuture(actualContenderID) + .eventuallySucceeds() + .isEqualTo(DEFAULT_TEST_CONTENDER_ID); } @Test @@ -149,37 +165,41 @@ class DefaultLeaderElectionTest { } private void testHasLeadership(boolean expectedReturnValue) throws Exception { + final AtomicReference<String> contenderIDRef = new AtomicReference<>(); final AtomicReference<UUID> leaderSessionIDRef = new AtomicReference<>(); final AbstractLeaderElectionService parentService = TestingAbstractLeaderElectionService.newBuilder() .setHasLeadershipFunction( - leaderSessionID -> { - leaderSessionIDRef.set(leaderSessionID); + (actualContenderID, actualLeaderSessionID) -> { + contenderIDRef.set(actualContenderID); + leaderSessionIDRef.set(actualLeaderSessionID); return expectedReturnValue; }) .build(); - try (final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService)) { + try (final DefaultLeaderElection testInstance = + new DefaultLeaderElection(parentService, DEFAULT_TEST_CONTENDER_ID)) { final UUID expectedLeaderSessionID = UUID.randomUUID(); assertThat(testInstance.hasLeadership(expectedLeaderSessionID)) .isEqualTo(expectedReturnValue); - assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID); + assertThat(contenderIDRef).hasValue(DEFAULT_TEST_CONTENDER_ID); + assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID); } } private static class TestingAbstractLeaderElectionService extends AbstractLeaderElectionService { - private final ThrowingConsumer<LeaderContender, Exception> registerConsumer; - private final Consumer<LeaderContender> removeConsumer; - private final BiConsumer<UUID, String> confirmLeadershipConsumer; - private final Function<UUID, Boolean> hasLeadershipFunction; + private final BiConsumerWithException<String, LeaderContender, Exception> registerConsumer; + private final Consumer<String> removeConsumer; + private final TriConsumer<String, UUID, String> confirmLeadershipConsumer; + private final BiFunction<String, UUID, Boolean> hasLeadershipFunction; private TestingAbstractLeaderElectionService( - ThrowingConsumer<LeaderContender, Exception> registerConsumer, - Consumer<LeaderContender> removeConsumer, - BiConsumer<UUID, String> confirmLeadershipConsumer, - Function<UUID, Boolean> hasLeadershipFunction) { + BiConsumerWithException<String, LeaderContender, Exception> registerConsumer, + Consumer<String> removeConsumer, + TriConsumer<String, UUID, String> confirmLeadershipConsumer, + BiFunction<String, UUID, Boolean> hasLeadershipFunction) { super(); this.registerConsumer = registerConsumer; @@ -189,39 +209,40 @@ class DefaultLeaderElectionTest { } @Override - protected void register(LeaderContender contender) throws Exception { - registerConsumer.accept(contender); + protected void register(String contenderID, LeaderContender contender) throws Exception { + registerConsumer.accept(contenderID, contender); } @Override - protected void remove(LeaderContender contender) { - removeConsumer.accept(contender); + protected void remove(String contenderID) { + removeConsumer.accept(contenderID); } @Override - protected void confirmLeadership(UUID leaderSessionID, String leaderAddress) { - confirmLeadershipConsumer.accept(leaderSessionID, leaderAddress); + protected void confirmLeadership( + String contenderID, UUID leaderSessionID, String leaderAddress) { + confirmLeadershipConsumer.accept(contenderID, leaderSessionID, leaderAddress); } @Override - protected boolean hasLeadership(UUID leaderSessionId) { - return hasLeadershipFunction.apply(leaderSessionId); + protected boolean hasLeadership(String contenderID, UUID leaderSessionId) { + return hasLeadershipFunction.apply(contenderID, leaderSessionId); } public static Builder newBuilder() { return new Builder() .setRegisterConsumer( - contender -> { + (contenderID, contender) -> { throw new UnsupportedOperationException("register not supported"); }) - .setRemoveConsumer(contender -> {}) + .setRemoveConsumer(contenderID -> {}) .setConfirmLeadershipConsumer( - (leaderSessionID, address) -> { + (contenderID, leaderSessionID, address) -> { throw new UnsupportedOperationException( "confirmLeadership not supported"); }) .setHasLeadershipFunction( - leaderSessionID -> { + (contenderID, leaderSessionID) -> { throw new UnsupportedOperationException( "hasLeadership not supported"); }); @@ -229,33 +250,32 @@ class DefaultLeaderElectionTest { private static class Builder { - private ThrowingConsumer<LeaderContender, Exception> registerConsumer = - ignoredContender -> {}; - private Consumer<LeaderContender> removeConsumer; - private BiConsumer<UUID, String> confirmLeadershipConsumer = - (ignoredSessionID, ignoredAddress) -> {}; - private Function<UUID, Boolean> hasLeadershipFunction = ignoredSessiondID -> false; + private BiConsumerWithException<String, LeaderContender, Exception> registerConsumer; + private Consumer<String> removeConsumer; + private TriConsumer<String, UUID, String> confirmLeadershipConsumer; + private BiFunction<String, UUID, Boolean> hasLeadershipFunction; private Builder() {} public Builder setRegisterConsumer( - ThrowingConsumer<LeaderContender, Exception> registerConsumer) { + BiConsumerWithException<String, LeaderContender, Exception> registerConsumer) { this.registerConsumer = registerConsumer; return this; } - public Builder setRemoveConsumer(Consumer<LeaderContender> removeConsumer) { + public Builder setRemoveConsumer(Consumer<String> removeConsumer) { this.removeConsumer = removeConsumer; return this; } public Builder setConfirmLeadershipConsumer( - BiConsumer<UUID, String> confirmLeadershipConsumer) { + TriConsumer<String, UUID, String> confirmLeadershipConsumer) { this.confirmLeadershipConsumer = confirmLeadershipConsumer; return this; } - public Builder setHasLeadershipFunction(Function<UUID, Boolean> hasLeadershipFunction) { + public Builder setHasLeadershipFunction( + BiFunction<String, UUID, Boolean> hasLeadershipFunction) { this.hasLeadershipFunction = hasLeadershipFunction; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index efa18e6ab83..9cc66385187 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -204,7 +204,7 @@ public class LeaderElectionTest { @Override public LeaderElection createLeaderElection() { - return leaderElectionService.createLeaderElection(); + return leaderElectionService.createLeaderElection("random-contender-id"); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java index 24d0532768e..90d729814d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java @@ -146,7 +146,8 @@ class ZooKeeperLeaderElectionConnectionHandlingTest { client.getConnectionStateListenable().addListener(connectionStateListener); final TestingContender contender = new TestingContender(); - try (LeaderElection leaderElection = leaderElectionService.createLeaderElection()) { + try (LeaderElection leaderElection = + leaderElectionService.createLeaderElection("random-contender-id")) { leaderElection.startLeaderElection(contender); contender.awaitGrantLeadership(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index c0047a8a58d..c7d58f3dcd9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -189,7 +189,8 @@ class ZooKeeperLeaderElectionTest { for (int i = 0; i < num; i++) { leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient()); - leaderElections[i] = leaderElectionService[i].createLeaderElection(); + leaderElections[i] = + leaderElectionService[i].createLeaderElection("random-contender-id"); contenders[i] = new TestingContender(createAddress(i), leaderElections[i]); LOG.debug("Start leader election service for contender #{}.", i); @@ -270,6 +271,7 @@ class ZooKeeperLeaderElectionTest { int num = 3; int numTries = 30; + final String contenderID = "random-contender-id"; DefaultLeaderElectionService[] leaderElectionService = new DefaultLeaderElectionService[num]; LeaderElection[] leaderElections = new LeaderElection[num]; @@ -287,7 +289,7 @@ class ZooKeeperLeaderElectionTest { for (int i = 0; i < num; i++) { leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient()); - leaderElections[i] = leaderElectionService[i].createLeaderElection(); + leaderElections[i] = leaderElectionService[i].createLeaderElection(contenderID); contenders[i] = new TestingContender(LEADER_ADDRESS + "_" + i + "_0", leaderElections[i]); @@ -320,7 +322,8 @@ class ZooKeeperLeaderElectionTest { // create new leader election service which takes part in the leader election leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient()); - leaderElections[index] = leaderElectionService[index].createLeaderElection(); + leaderElections[index] = + leaderElectionService[index].createLeaderElection(contenderID); contenders[index] = new TestingContender(