This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 62b792d58c4 [FLINK-31781][runtime] Introduces contender ID in 
LeaderElectionService interface
62b792d58c4 is described below

commit 62b792d58c4f8d5b494b50daad2e5fc5047af330
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Tue May 16 18:42:49 2023 +0200

    [FLINK-31781][runtime] Introduces contender ID in LeaderElectionService 
interface
    
    Signed-off-by: Matthias Pohl <matthias.p...@aiven.io>
---
 .../highavailability/AbstractHaServices.java       |   7 +-
 .../AbstractLeaderElectionService.java             |  36 +++---
 .../leaderelection/DefaultLeaderElection.java      |  23 ++--
 .../DefaultLeaderElectionService.java              |  60 ++++++---
 .../leaderelection/LeaderElectionService.java      |   5 +-
 .../JobMasterServiceLeadershipRunnerTest.java      |   4 +-
 .../DefaultLeaderElectionServiceTest.java          | 105 +++++++++++-----
 .../leaderelection/DefaultLeaderElectionTest.java  | 140 ++++++++++++---------
 .../runtime/leaderelection/LeaderElectionTest.java |   2 +-
 ...KeeperLeaderElectionConnectionHandlingTest.java |   3 +-
 .../ZooKeeperLeaderElectionTest.java               |   9 +-
 11 files changed, 247 insertions(+), 147 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index 4c24110c42a..401e63cc959 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -247,7 +247,12 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
         leaderElectionService.startLeaderElectionBackend();
 
         closeableRegistry.registerCloseable(leaderElectionService);
-        return leaderElectionService.createLeaderElection();
+        // the leaderName which is passed as a contenderID here is not 
actively used within the
+        // DefaultLeaderElectionService for now - this will change in a future 
step where the
+        // DefaultLeaderElectionService will start to use 
MultipleComponentLeaderElectionDriver
+        // instead of LeaderElectionDriver and fully replace
+        // DefaultMultipleComponentLeaderElectionService (FLINK-31783)
+        return 
leaderElectionService.createLeaderElection("unused-contender-id");
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
index 0abad419329..eef2d731fa7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
@@ -27,29 +27,37 @@ import java.util.UUID;
 public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
 
     @Override
-    public LeaderElection createLeaderElection() {
-        return new DefaultLeaderElection(this);
+    public LeaderElection createLeaderElection(String contenderID) {
+        return new DefaultLeaderElection(this, contenderID);
     }
 
     /**
-     * Registers the given {@link LeaderContender} with the underlying {@code
-     * LeaderElectionService}. Leadership changes are starting to be reported 
to the {@code
+     * Registers the {@link LeaderContender} under the {@code contenderID} 
with the underlying
+     * {@code LeaderElectionService}. Leadership changes are starting to be 
reported to the {@code
      * LeaderContender}.
      */
-    protected abstract void register(LeaderContender contender) throws 
Exception;
+    protected abstract void register(String contenderID, LeaderContender 
contender)
+            throws Exception;
 
-    /** Removes the passed {@code LeaderContender} from the {@code 
LeaderElectionService}. */
-    protected abstract void remove(LeaderContender contender);
+    /**
+     * Removes the {@code LeaderContender} from the {@code 
LeaderElectionService} that is associated
+     * with the {@code contenderID}.
+     */
+    protected abstract void remove(String contenderID);
 
-    /** Confirms the leadership with the given session ID and address. */
-    protected abstract void confirmLeadership(UUID leaderSessionID, String 
leaderAddress);
+    /**
+     * Confirms the leadership with the {@code leaderSessionID} and {@code 
leaderAddress} for the
+     * {@link LeaderContender} that is associated with the {@code contenderID}.
+     */
+    protected abstract void confirmLeadership(
+            String contenderID, UUID leaderSessionID, String leaderAddress);
 
     /**
-     * Checks whether the {@code LeaderElectionService} has the leadership 
acquired for the given
-     * session ID.
+     * Checks whether the {@code LeaderElectionService} has the leadership 
acquired for the {@code
+     * contenderID} and {@code leaderSessionID}.
      *
-     * @return {@code true} if the service has leadership with the passed 
session ID acquired;
-     *     {@code false} otherwise.
+     * @return {@code true} if the service has leadership with the passed 
{@code leaderSessionID}
+     *     acquired; {@code false} otherwise.
      */
-    protected abstract boolean hasLeadership(UUID leaderSessionId);
+    protected abstract boolean hasLeadership(String contenderID, UUID 
leaderSessionID);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
index e5cc3dbe550..da49687459e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
-
 import java.util.UUID;
 
 /**
@@ -31,36 +29,31 @@ import java.util.UUID;
 class DefaultLeaderElection implements LeaderElection {
 
     private final AbstractLeaderElectionService parentService;
-    @Nullable private LeaderContender leaderContender;
+    private final String contenderID;
 
-    DefaultLeaderElection(AbstractLeaderElectionService parentService) {
+    DefaultLeaderElection(AbstractLeaderElectionService parentService, String 
contenderID) {
         this.parentService = parentService;
+        this.contenderID = contenderID;
     }
 
     @Override
     public void startLeaderElection(LeaderContender contender) throws 
Exception {
-        Preconditions.checkState(
-                leaderContender == null, "There shouldn't be any 
LeaderContender registered, yet.");
-        leaderContender = Preconditions.checkNotNull(contender);
-
-        parentService.register(leaderContender);
+        Preconditions.checkNotNull(contender);
+        parentService.register(contenderID, contender);
     }
 
     @Override
     public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
-        parentService.confirmLeadership(leaderSessionID, leaderAddress);
+        parentService.confirmLeadership(contenderID, leaderSessionID, 
leaderAddress);
     }
 
     @Override
     public boolean hasLeadership(UUID leaderSessionId) {
-        return parentService.hasLeadership(leaderSessionId);
+        return parentService.hasLeadership(contenderID, leaderSessionId);
     }
 
     @Override
     public void close() throws Exception {
-        if (leaderContender != null) {
-            parentService.remove(leaderContender);
-            leaderContender = null;
-        }
+        parentService.remove(contenderID);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
index 2487bd15ad7..9c6f1d72242 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
@@ -55,14 +55,18 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
     /**
-     * {@code leaderContender} being {@code null} indicates that no {@link 
LeaderContender} is
-     * registered that participates in the leader election, yet. See {@link
-     * #register(LeaderContender)} and {@link #remove(LeaderContender)} for 
lifecycle management.
+     * {@code contenderID} being {@code null} indicates that no {@link 
LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link 
#register(String,
+     * LeaderContender)} and {@link #remove(String)} for lifecycle management.
      *
      * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
      * in a supporting IDE.
      */
     @GuardedBy("lock")
+    private String contenderID;
+
+    /** {@code leaderContender} is closely linked to the {@link #contenderID}. 
*/
+    @GuardedBy("lock")
     private LeaderContender leaderContender;
 
     /**
@@ -161,18 +165,22 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
     }
 
     @Override
-    protected void register(LeaderContender contender) throws Exception {
+    protected void register(String contenderID, LeaderContender contender) 
throws Exception {
+        checkNotNull(contenderID, "ContenderID must not be null.");
         checkNotNull(contender, "Contender must not be null.");
 
         synchronized (lock) {
             Preconditions.checkState(
                     leaderContender == null,
                     "Only one LeaderContender is allowed to be registered to 
this service.");
+            Preconditions.checkState(
+                    this.contenderID == null, "The contenderID is only allowed 
to be set once.");
             Preconditions.checkState(
                     running,
                     "The DefaultLeaderElectionService should have established 
a connection to the backend before it's started.");
 
             leaderContender = contender;
+            this.contenderID = contenderID;
 
             LOG.info(
                     "LeaderContender {} has been registered for {}.",
@@ -188,17 +196,20 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
     }
 
     @Override
-    protected final void remove(LeaderContender contender) {
-        Preconditions.checkArgument(contender == this.leaderContender);
-        LOG.info("Stopping DefaultLeaderElectionService.");
-
+    protected final void remove(String contenderID) {
         synchronized (lock) {
-            if (leaderContender == null) {
+            if (this.contenderID == null) {
                 LOG.debug(
                         "The stop procedure was called on an already stopped 
DefaultLeaderElectionService instance. No action necessary.");
                 return;
             }
 
+            LOG.info("Stopping DefaultLeaderElectionService for {}.", 
this.contenderID);
+
+            Preconditions.checkNotNull(
+                    leaderContender,
+                    "There should be a LeaderContender registered under the 
given contenderID '%s'.",
+                    this.contenderID);
             if (issuedLeaderSessionID != null) {
                 notifyLeaderContenderOfLeadershipLoss();
                 LOG.debug(
@@ -217,6 +228,7 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
                         "DefaultLeaderElectionService is stopping while not 
having the leadership acquired. No cleanup necessary.");
             }
 
+            this.contenderID = null;
             leaderContender = null;
         }
     }
@@ -253,13 +265,19 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
     }
 
     @Override
-    protected void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
-        LOG.debug("Confirm leader session ID {} for leader {}.", 
leaderSessionID, leaderAddress);
+    protected void confirmLeadership(
+            String contenderID, UUID leaderSessionID, String leaderAddress) {
+        Preconditions.checkArgument(contenderID.equals(this.contenderID));
+        LOG.debug(
+                "The leader session of {} is confirmed with session ID {} for 
and address {}.",
+                this.contenderID,
+                leaderSessionID,
+                leaderAddress);
 
         checkNotNull(leaderSessionID);
 
         synchronized (lock) {
-            if (hasLeadership(leaderSessionID)) {
+            if (hasLeadership(contenderID, leaderSessionID)) {
                 Preconditions.checkState(
                         confirmedLeaderInformation.isEmpty(),
                         "No confirmation should have happened, yet.");
@@ -284,15 +302,16 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
     }
 
     @Override
-    protected boolean hasLeadership(UUID leaderSessionId) {
+    protected boolean hasLeadership(String contenderID, UUID leaderSessionId) {
         synchronized (lock) {
             if (leaderElectionDriver != null) {
-                if (leaderContender != null) {
+                if (contenderID.equals(this.contenderID)) {
                     return leaderElectionDriver.hasLeadership()
                             && leaderSessionId.equals(issuedLeaderSessionID);
                 } else {
                     LOG.debug(
-                            "hasLeadership is called after the service is 
stopped, returning false.");
+                            "hasLeadership is called for contender ID '{}' 
while there is no contender registered under that ID in service, returning 
false.",
+                            contenderID);
                     return false;
                 }
             } else {
@@ -302,12 +321,17 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
         }
     }
 
-    /** Returns the current leader session ID or {@code null}, if the session 
wasn't confirmed. */
+    /**
+     * Returns the current leader session ID for the given {@code contenderID} 
or {@code null}, if
+     * the session wasn't confirmed.
+     */
     @VisibleForTesting
     @Nullable
-    public UUID getLeaderSessionID() {
+    public UUID getLeaderSessionID(String contenderID) {
         synchronized (lock) {
-            return confirmedLeaderInformation.getLeaderSessionID();
+            return contenderID.equals(this.contenderID)
+                    ? confirmedLeaderInformation.getLeaderSessionID()
+                    : null;
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index c83625a7db0..88b0b0460ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -38,6 +38,9 @@ public interface LeaderElectionService {
     /**
      * Creates a new {@link LeaderElection} instance that is registered to 
this {@code
      * LeaderElectionService} instance.
+     *
+     * @param contenderID a unique identifier that refers to the stored leader 
information that the
+     *     newly created {@link LeaderElection} manages.
      */
-    LeaderElection createLeaderElection();
+    LeaderElection createLeaderElection(String contenderID);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index 9b2a2e6af6d..a4df85e5f3c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -752,7 +752,9 @@ class JobMasterServiceLeadershipRunnerTest {
                                         .setJobMasterServiceProcessFunction(
                                                 ignoredSessionId -> 
jobMasterServiceProcess)
                                         .build())
-                        
.setLeaderElection(defaultLeaderElectionService.createLeaderElection())
+                        .setLeaderElection(
+                                
defaultLeaderElectionService.createLeaderElection(
+                                        "random-contender-id"))
                         .build()) {
             jobManagerRunner.start();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
index dcd12281654..a95fe2df66d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
@@ -54,11 +54,13 @@ class DefaultLeaderElectionServiceTest {
                             testingContender.waitForLeader();
                             
assertThat(testingContender.getDescription()).isEqualTo(TEST_URL);
                             assertThat(testingContender.getLeaderSessionID())
-                                    
.isEqualTo(leaderElectionService.getLeaderSessionID());
+                                    .isEqualTo(
+                                            
leaderElectionService.getLeaderSessionID(contenderID));
 
                             final LeaderInformation 
expectedLeaderInformationInHaBackend =
                                     LeaderInformation.known(
-                                            
leaderElectionService.getLeaderSessionID(), TEST_URL);
+                                            
leaderElectionService.getLeaderSessionID(contenderID),
+                                            TEST_URL);
                             
assertThat(testingLeaderElectionDriver.getLeaderInformation())
                                     .as(
                                             "The HA backend should have its 
leader information updated.")
@@ -68,7 +70,8 @@ class DefaultLeaderElectionServiceTest {
                             testingLeaderElectionDriver.notLeader();
                             testingContender.waitForRevokeLeader();
                             
assertThat(testingContender.getLeaderSessionID()).isNull();
-                            
assertThat(leaderElectionService.getLeaderSessionID()).isNull();
+                            
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
+                                    .isNull();
                             
assertThat(testingLeaderElectionDriver.getLeaderInformation())
                                     .as(
                                             "External storage is not touched 
by the leader session because the leadership is already lost.")
@@ -156,7 +159,8 @@ class DefaultLeaderElectionServiceTest {
                                 Executors.newDirectExecutorService())) {
             testInstance.startLeaderElectionBackend();
 
-            final LeaderElection leaderElection = 
testInstance.createLeaderElection();
+            final LeaderElection leaderElection =
+                    
testInstance.createLeaderElection(createRandomContenderID());
             final TestingContender testingContender =
                     new TestingContender("unused-address", leaderElection);
             testingContender.startLeaderElection();
@@ -181,7 +185,7 @@ class DefaultLeaderElectionServiceTest {
                             
testingLeaderElectionDriver.isLeader(expectedSessionID);
 
                             try (LeaderElection anotherLeaderElection =
-                                    
leaderElectionService.createLeaderElection()) {
+                                    
leaderElectionService.createLeaderElection(contenderID)) {
                                 final TestingContender testingContender =
                                         new TestingContender(TEST_URL, 
anotherLeaderElection);
                                 testingContender.startLeaderElection();
@@ -219,7 +223,8 @@ class DefaultLeaderElectionServiceTest {
                             
testingLeaderElectionDriver.isLeader(expectedSessionID);
                             executorService.trigger();
 
-                            leaderElection = 
leaderElectionService.createLeaderElection();
+                            leaderElection =
+                                    
leaderElectionService.createLeaderElection(contenderID);
                             final TestingContender contender =
                                     new TestingContender("unused-address", 
leaderElection);
                             contender.startLeaderElection();
@@ -251,7 +256,8 @@ class DefaultLeaderElectionServiceTest {
 
             driver.isLeader();
 
-            final LeaderElection leaderElection = 
testInstance.createLeaderElection();
+            final LeaderElection leaderElection =
+                    
testInstance.createLeaderElection(createRandomContenderID());
             final TestingContender contender =
                     new TestingContender("unused-address", leaderElection);
             contender.startLeaderElection();
@@ -278,7 +284,8 @@ class DefaultLeaderElectionServiceTest {
 
             driver.isLeader();
 
-            final LeaderElection leaderElection = 
testInstance.createLeaderElection();
+            final LeaderElection leaderElection =
+                    
testInstance.createLeaderElection(createRandomContenderID());
             
leaderElection.startLeaderElection(TestingGenericLeaderContender.newBuilder().build());
 
             leaderElection.close();
@@ -290,7 +297,8 @@ class DefaultLeaderElectionServiceTest {
         try (final DefaultLeaderElectionService leaderElectionService =
                 new DefaultLeaderElectionService(
                         new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory())) {
-            final LeaderElection leaderElection = 
leaderElectionService.createLeaderElection();
+            final LeaderElection leaderElection =
+                    
leaderElectionService.createLeaderElection(createRandomContenderID());
             assertThatThrownBy(
                             () ->
                                     new TestingContender("unused-address", 
leaderElection)
@@ -326,7 +334,8 @@ class DefaultLeaderElectionServiceTest {
                             testingContender.waitForLeader();
 
                             
assertThat(testingContender.getLeaderSessionID()).isNotNull();
-                            
assertThat(leaderElectionService.getLeaderSessionID()).isNotNull();
+                            
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
+                                    .isNotNull();
                             
assertThat(testingLeaderElectionDriver.getLeaderInformation().isEmpty())
                                     .isFalse();
 
@@ -336,7 +345,7 @@ class DefaultLeaderElectionServiceTest {
                                     .as(
                                             "The LeaderContender should have 
been informed about the leadership loss.")
                                     .isNull();
-                            
assertThat(leaderElectionService.getLeaderSessionID())
+                            
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
                                     .as(
                                             "The LeaderElectionService should 
have its internal state cleaned.")
                                     .isNull();
@@ -358,7 +367,8 @@ class DefaultLeaderElectionServiceTest {
 
                             final LeaderInformation expectedLeader =
                                     LeaderInformation.known(
-                                            
leaderElectionService.getLeaderSessionID(), TEST_URL);
+                                            
leaderElectionService.getLeaderSessionID(contenderID),
+                                            TEST_URL);
 
                             // Leader information changed on external storage. 
It should be
                             // corrected.
@@ -386,9 +396,13 @@ class DefaultLeaderElectionServiceTest {
 
                             
testingLeaderElectionDriver.isLeader(expectedSessionID);
 
-                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
expectedSessionID))
                                     .isFalse();
-                            
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
UUID.randomUUID()))
                                     .isFalse();
                         });
             }
@@ -406,9 +420,13 @@ class DefaultLeaderElectionServiceTest {
                             
testingLeaderElectionDriver.isLeader(expectedSessionID);
                             executorService.trigger();
 
-                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
expectedSessionID))
                                     .isTrue();
-                            
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
UUID.randomUUID()))
                                     .isFalse();
                         });
             }
@@ -427,7 +445,9 @@ class DefaultLeaderElectionServiceTest {
                             executorService.trigger();
                             testingLeaderElectionDriver.notLeader();
 
-                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
expectedSessionID))
                                     .as(
                                             "No operation should be handled 
anymore after the HA backend "
                                                     + "indicated leadership 
loss even if the onRevokeLeadership wasn't "
@@ -435,7 +455,9 @@ class DefaultLeaderElectionServiceTest {
                                                     + "the leadership in the 
meantime already based on the HA "
                                                     + "backend's decision.")
                                     .isFalse();
-                            
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
UUID.randomUUID()))
                                     .isFalse();
                         });
             }
@@ -456,9 +478,13 @@ class DefaultLeaderElectionServiceTest {
                             testingLeaderElectionDriver.notLeader();
                             executorService.trigger();
 
-                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
expectedSessionID))
                                     .isFalse();
-                            
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
UUID.randomUUID()))
                                     .isFalse();
                         });
             }
@@ -477,7 +503,9 @@ class DefaultLeaderElectionServiceTest {
 
                             leaderElection.close();
 
-                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                            assertThat(
+                                            
leaderElectionService.hasLeadership(
+                                                    contenderID, 
expectedSessionID))
                                     .isFalse();
                         });
             }
@@ -510,7 +538,7 @@ class DefaultLeaderElectionServiceTest {
                             leaderElection.close();
                             testingLeaderElectionDriver.isLeader();
 
-                            
assertThat(leaderElectionService.getLeaderSessionID())
+                            
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
                                     .as(
                                             "The grant event shouldn't have 
been processed by the LeaderElectionService.")
                                     .isNull();
@@ -548,7 +576,8 @@ class DefaultLeaderElectionServiceTest {
                 runTestWithSynchronousEventHandling(
                         () -> {
                             testingLeaderElectionDriver.isLeader();
-                            final UUID oldSessionId = 
leaderElectionService.getLeaderSessionID();
+                            final UUID oldSessionId =
+                                    
leaderElectionService.getLeaderSessionID(contenderID);
                             assertThat(testingContender.getLeaderSessionID())
                                     .isEqualTo(oldSessionId);
 
@@ -571,12 +600,12 @@ class DefaultLeaderElectionServiceTest {
                         () -> {
                             testingLeaderElectionDriver.isLeader();
                             final UUID currentLeaderSessionId =
-                                    leaderElectionService.getLeaderSessionID();
+                                    
leaderElectionService.getLeaderSessionID(contenderID);
                             assertThat(currentLeaderSessionId).isNotNull();
 
                             // Old confirm call should be ignored.
                             
leaderElection.confirmLeadership(UUID.randomUUID(), TEST_URL);
-                            
assertThat(leaderElectionService.getLeaderSessionID())
+                            
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
                                     .isEqualTo(currentLeaderSessionId);
                         });
             }
@@ -591,7 +620,7 @@ class DefaultLeaderElectionServiceTest {
                         () -> {
                             testingLeaderElectionDriver.isLeader();
                             final UUID currentLeaderSessionId =
-                                    leaderElectionService.getLeaderSessionID();
+                                    
leaderElectionService.getLeaderSessionID(contenderID);
                             assertThat(currentLeaderSessionId).isNotNull();
 
                             testingLeaderElectionDriver.notLeader();
@@ -599,7 +628,8 @@ class DefaultLeaderElectionServiceTest {
                             // Old confirm call should be ignored.
                             
leaderElection.confirmLeadership(currentLeaderSessionId, TEST_URL);
 
-                            
assertThat(leaderElectionService.getLeaderSessionID()).isNull();
+                            
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
+                                    .isNull();
                         });
             }
         };
@@ -656,7 +686,8 @@ class DefaultLeaderElectionServiceTest {
                 new 
DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
         leaderElectionService.startLeaderElectionBackend();
 
-        final LeaderElection leaderElection = 
leaderElectionService.createLeaderElection();
+        final LeaderElection leaderElection =
+                
leaderElectionService.createLeaderElection(createRandomContenderID());
         final TestingContender testingContender = new 
TestingContender(TEST_URL, leaderElection);
         testingContender.startLeaderElection();
 
@@ -697,12 +728,15 @@ class DefaultLeaderElectionServiceTest {
                         (leaderElectionEventHandler, errorHandler) -> driver);
         testInstance.startLeaderElectionBackend();
 
+        final String contenderID = "contender-id";
         final String address = "leader-address";
-        final LeaderElection leaderElection = 
testInstance.createLeaderElection();
+        final LeaderElection leaderElection = 
testInstance.createLeaderElection(contenderID);
         leaderElection.startLeaderElection(
                 TestingGenericLeaderContender.newBuilder()
                         .setGrantLeadershipConsumer(
-                                sessionID -> 
testInstance.confirmLeadership(sessionID, address))
+                                sessionID ->
+                                        testInstance.confirmLeadership(
+                                                contenderID, sessionID, 
address))
                         .build());
 
         // initial messages to initialize usedLeaderSessionID and 
confirmedLeaderInformation
@@ -761,7 +795,8 @@ class DefaultLeaderElectionServiceTest {
         final DefaultLeaderElectionService testInstance =
                 new DefaultLeaderElectionService(driverFactory);
         testInstance.startLeaderElectionBackend();
-        final LeaderElection leaderElection = 
testInstance.createLeaderElection();
+        final LeaderElection leaderElection =
+                testInstance.createLeaderElection(createRandomContenderID());
         leaderElection.startLeaderElection(contender);
 
         final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
@@ -775,8 +810,14 @@ class DefaultLeaderElectionServiceTest {
         testInstance.close();
     }
 
+    private static String createRandomContenderID() {
+        return String.format("contender-id-%s", UUID.randomUUID());
+    }
+
     private static class Context {
 
+        final String contenderID = createRandomContenderID();
+
         DefaultLeaderElectionService leaderElectionService;
         TestingContender testingContender;
 
@@ -807,7 +848,7 @@ class DefaultLeaderElectionServiceTest {
                                 driverFactory, leaderEventOperationExecutor);
                 leaderElectionService.startLeaderElectionBackend();
 
-                leaderElection = leaderElectionService.createLeaderElection();
+                leaderElection = 
leaderElectionService.createLeaderElection(contenderID);
                 testingContender = new TestingContender(TEST_URL, 
leaderElection);
                 testingContender.startLeaderElection();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
index 3e1de05856a..c5346a27854 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
@@ -18,34 +18,44 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.TriConsumer;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class DefaultLeaderElectionTest {
 
+    private static final String DEFAULT_TEST_CONTENDER_ID = 
"test-contender-id";
+
     @Test
     void testContenderRegistration() throws Exception {
+        final AtomicReference<String> contenderIDRef = new AtomicReference<>();
         final AtomicReference<LeaderContender> contenderRef = new 
AtomicReference<>();
         final AbstractLeaderElectionService parentService =
                 TestingAbstractLeaderElectionService.newBuilder()
-                        .setRegisterConsumer(contenderRef::set)
+                        .setRegisterConsumer(
+                                (actualContenderID, actualContender) -> {
+                                    contenderIDRef.set(actualContenderID);
+                                    contenderRef.set(actualContender);
+                                })
                         .build();
-        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
+        try (final DefaultLeaderElection testInstance =
+                new DefaultLeaderElection(parentService, 
DEFAULT_TEST_CONTENDER_ID)) {
 
             final LeaderContender contender = 
TestingGenericLeaderContender.newBuilder().build();
             testInstance.startLeaderElection(contender);
 
+            assertThat(contenderIDRef).hasValue(DEFAULT_TEST_CONTENDER_ID);
             assertThat(contenderRef.get()).isSameAs(contender);
         }
     }
@@ -54,7 +64,8 @@ class DefaultLeaderElectionTest {
     void testContenderRegistrationNull() throws Exception {
         try (final DefaultLeaderElection testInstance =
                 new DefaultLeaderElection(
-                        
TestingAbstractLeaderElectionService.newBuilder().build())) {
+                        
TestingAbstractLeaderElectionService.newBuilder().build(),
+                        DEFAULT_TEST_CONTENDER_ID)) {
             assertThatThrownBy(() -> testInstance.startLeaderElection(null))
                     .isInstanceOf(NullPointerException.class);
         }
@@ -67,11 +78,12 @@ class DefaultLeaderElectionTest {
         final AbstractLeaderElectionService parentService =
                 TestingAbstractLeaderElectionService.newBuilder()
                         .setRegisterConsumer(
-                                contender -> {
+                                (actualContenderID, actualContender) -> {
                                     throw expectedException;
                                 })
                         .build();
-        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
+        try (final DefaultLeaderElection testInstance =
+                new DefaultLeaderElection(parentService, 
DEFAULT_TEST_CONTENDER_ID)) {
             assertThatThrownBy(
                             () ->
                                     testInstance.startLeaderElection(
@@ -82,60 +94,64 @@ class DefaultLeaderElectionTest {
 
     @Test
     void testLeaderConfirmation() throws Exception {
+        final AtomicReference<String> contenderIDRef = new AtomicReference<>();
         final AtomicReference<UUID> leaderSessionIDRef = new 
AtomicReference<>();
         final AtomicReference<String> leaderAddressRef = new 
AtomicReference<>();
         final AbstractLeaderElectionService parentService =
                 TestingAbstractLeaderElectionService.newBuilder()
                         .setConfirmLeadershipConsumer(
-                                (leaderSessionID, address) -> {
+                                (contenderID, leaderSessionID, address) -> {
+                                    contenderIDRef.set(contenderID);
                                     leaderSessionIDRef.set(leaderSessionID);
                                     leaderAddressRef.set(address);
                                 })
                         .build();
-        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
+        try (final DefaultLeaderElection testInstance =
+                new DefaultLeaderElection(parentService, 
DEFAULT_TEST_CONTENDER_ID)) {
 
             final UUID expectedLeaderSessionID = UUID.randomUUID();
             final String expectedAddress = "random-address";
             testInstance.confirmLeadership(expectedLeaderSessionID, 
expectedAddress);
 
-            
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
-            assertThat(leaderAddressRef.get()).isEqualTo(expectedAddress);
+            assertThat(contenderIDRef).hasValue(DEFAULT_TEST_CONTENDER_ID);
+            assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID);
+            assertThat(leaderAddressRef).hasValue(expectedAddress);
         }
     }
 
     @Test
     void testClose() throws Exception {
-        final CompletableFuture<LeaderContender> actualContender = new 
CompletableFuture<>();
+        final CompletableFuture<String> actualContenderID = new 
CompletableFuture<>();
         final AbstractLeaderElectionService parentService =
                 TestingAbstractLeaderElectionService.newBuilder()
-                        .setRegisterConsumer(ignoredContender -> {})
-                        .setRemoveConsumer(actualContender::complete)
+                        .setRegisterConsumer((ignoredContenderID, 
ignoredContender) -> {})
+                        .setRemoveConsumer(actualContenderID::complete)
                         .build();
 
-        final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService);
+        final DefaultLeaderElection testInstance =
+                new DefaultLeaderElection(parentService, 
DEFAULT_TEST_CONTENDER_ID);
 
-        final LeaderContender contender = 
TestingGenericLeaderContender.newBuilder().build();
-        testInstance.startLeaderElection(contender);
+        
testInstance.startLeaderElection(TestingGenericLeaderContender.newBuilder().build());
         testInstance.close();
 
-        assertThat(actualContender).isCompletedWithValue(contender);
+        
assertThat(actualContenderID).isCompletedWithValue(DEFAULT_TEST_CONTENDER_ID);
     }
 
     @Test
     void testCloseWithoutStart() throws Exception {
-        final CompletableFuture<LeaderContender> actualContender = new 
CompletableFuture<>();
+        final CompletableFuture<String> actualContenderID = new 
CompletableFuture<>();
         final AbstractLeaderElectionService parentService =
                 TestingAbstractLeaderElectionService.newBuilder()
-                        .setRemoveConsumer(actualContender::complete)
+                        .setRemoveConsumer(actualContenderID::complete)
                         .build();
 
-        final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService);
+        final DefaultLeaderElection testInstance =
+                new DefaultLeaderElection(parentService, 
DEFAULT_TEST_CONTENDER_ID);
         testInstance.close();
 
-        assertThat(actualContender)
-                .as(
-                        "No removal should be triggered if there's no 
contender that need to be deregistered.")
-                .isNotDone();
+        assertThatFuture(actualContenderID)
+                .eventuallySucceeds()
+                .isEqualTo(DEFAULT_TEST_CONTENDER_ID);
     }
 
     @Test
@@ -149,37 +165,41 @@ class DefaultLeaderElectionTest {
     }
 
     private void testHasLeadership(boolean expectedReturnValue) throws 
Exception {
+        final AtomicReference<String> contenderIDRef = new AtomicReference<>();
         final AtomicReference<UUID> leaderSessionIDRef = new 
AtomicReference<>();
         final AbstractLeaderElectionService parentService =
                 TestingAbstractLeaderElectionService.newBuilder()
                         .setHasLeadershipFunction(
-                                leaderSessionID -> {
-                                    leaderSessionIDRef.set(leaderSessionID);
+                                (actualContenderID, actualLeaderSessionID) -> {
+                                    contenderIDRef.set(actualContenderID);
+                                    
leaderSessionIDRef.set(actualLeaderSessionID);
                                     return expectedReturnValue;
                                 })
                         .build();
-        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
+        try (final DefaultLeaderElection testInstance =
+                new DefaultLeaderElection(parentService, 
DEFAULT_TEST_CONTENDER_ID)) {
 
             final UUID expectedLeaderSessionID = UUID.randomUUID();
             assertThat(testInstance.hasLeadership(expectedLeaderSessionID))
                     .isEqualTo(expectedReturnValue);
-            
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
+            assertThat(contenderIDRef).hasValue(DEFAULT_TEST_CONTENDER_ID);
+            assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID);
         }
     }
 
     private static class TestingAbstractLeaderElectionService
             extends AbstractLeaderElectionService {
 
-        private final ThrowingConsumer<LeaderContender, Exception> 
registerConsumer;
-        private final Consumer<LeaderContender> removeConsumer;
-        private final BiConsumer<UUID, String> confirmLeadershipConsumer;
-        private final Function<UUID, Boolean> hasLeadershipFunction;
+        private final BiConsumerWithException<String, LeaderContender, 
Exception> registerConsumer;
+        private final Consumer<String> removeConsumer;
+        private final TriConsumer<String, UUID, String> 
confirmLeadershipConsumer;
+        private final BiFunction<String, UUID, Boolean> hasLeadershipFunction;
 
         private TestingAbstractLeaderElectionService(
-                ThrowingConsumer<LeaderContender, Exception> registerConsumer,
-                Consumer<LeaderContender> removeConsumer,
-                BiConsumer<UUID, String> confirmLeadershipConsumer,
-                Function<UUID, Boolean> hasLeadershipFunction) {
+                BiConsumerWithException<String, LeaderContender, Exception> 
registerConsumer,
+                Consumer<String> removeConsumer,
+                TriConsumer<String, UUID, String> confirmLeadershipConsumer,
+                BiFunction<String, UUID, Boolean> hasLeadershipFunction) {
             super();
 
             this.registerConsumer = registerConsumer;
@@ -189,39 +209,40 @@ class DefaultLeaderElectionTest {
         }
 
         @Override
-        protected void register(LeaderContender contender) throws Exception {
-            registerConsumer.accept(contender);
+        protected void register(String contenderID, LeaderContender contender) 
throws Exception {
+            registerConsumer.accept(contenderID, contender);
         }
 
         @Override
-        protected void remove(LeaderContender contender) {
-            removeConsumer.accept(contender);
+        protected void remove(String contenderID) {
+            removeConsumer.accept(contenderID);
         }
 
         @Override
-        protected void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
-            confirmLeadershipConsumer.accept(leaderSessionID, leaderAddress);
+        protected void confirmLeadership(
+                String contenderID, UUID leaderSessionID, String 
leaderAddress) {
+            confirmLeadershipConsumer.accept(contenderID, leaderSessionID, 
leaderAddress);
         }
 
         @Override
-        protected boolean hasLeadership(UUID leaderSessionId) {
-            return hasLeadershipFunction.apply(leaderSessionId);
+        protected boolean hasLeadership(String contenderID, UUID 
leaderSessionId) {
+            return hasLeadershipFunction.apply(contenderID, leaderSessionId);
         }
 
         public static Builder newBuilder() {
             return new Builder()
                     .setRegisterConsumer(
-                            contender -> {
+                            (contenderID, contender) -> {
                                 throw new 
UnsupportedOperationException("register not supported");
                             })
-                    .setRemoveConsumer(contender -> {})
+                    .setRemoveConsumer(contenderID -> {})
                     .setConfirmLeadershipConsumer(
-                            (leaderSessionID, address) -> {
+                            (contenderID, leaderSessionID, address) -> {
                                 throw new UnsupportedOperationException(
                                         "confirmLeadership not supported");
                             })
                     .setHasLeadershipFunction(
-                            leaderSessionID -> {
+                            (contenderID, leaderSessionID) -> {
                                 throw new UnsupportedOperationException(
                                         "hasLeadership not supported");
                             });
@@ -229,33 +250,32 @@ class DefaultLeaderElectionTest {
 
         private static class Builder {
 
-            private ThrowingConsumer<LeaderContender, Exception> 
registerConsumer =
-                    ignoredContender -> {};
-            private Consumer<LeaderContender> removeConsumer;
-            private BiConsumer<UUID, String> confirmLeadershipConsumer =
-                    (ignoredSessionID, ignoredAddress) -> {};
-            private Function<UUID, Boolean> hasLeadershipFunction = 
ignoredSessiondID -> false;
+            private BiConsumerWithException<String, LeaderContender, 
Exception> registerConsumer;
+            private Consumer<String> removeConsumer;
+            private TriConsumer<String, UUID, String> 
confirmLeadershipConsumer;
+            private BiFunction<String, UUID, Boolean> hasLeadershipFunction;
 
             private Builder() {}
 
             public Builder setRegisterConsumer(
-                    ThrowingConsumer<LeaderContender, Exception> 
registerConsumer) {
+                    BiConsumerWithException<String, LeaderContender, 
Exception> registerConsumer) {
                 this.registerConsumer = registerConsumer;
                 return this;
             }
 
-            public Builder setRemoveConsumer(Consumer<LeaderContender> 
removeConsumer) {
+            public Builder setRemoveConsumer(Consumer<String> removeConsumer) {
                 this.removeConsumer = removeConsumer;
                 return this;
             }
 
             public Builder setConfirmLeadershipConsumer(
-                    BiConsumer<UUID, String> confirmLeadershipConsumer) {
+                    TriConsumer<String, UUID, String> 
confirmLeadershipConsumer) {
                 this.confirmLeadershipConsumer = confirmLeadershipConsumer;
                 return this;
             }
 
-            public Builder setHasLeadershipFunction(Function<UUID, Boolean> 
hasLeadershipFunction) {
+            public Builder setHasLeadershipFunction(
+                    BiFunction<String, UUID, Boolean> hasLeadershipFunction) {
                 this.hasLeadershipFunction = hasLeadershipFunction;
                 return this;
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index efa18e6ab83..9cc66385187 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -204,7 +204,7 @@ public class LeaderElectionTest {
 
         @Override
         public LeaderElection createLeaderElection() {
-            return leaderElectionService.createLeaderElection();
+            return 
leaderElectionService.createLeaderElection("random-contender-id");
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
index 24d0532768e..90d729814d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
@@ -146,7 +146,8 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
         
client.getConnectionStateListenable().addListener(connectionStateListener);
 
         final TestingContender contender = new TestingContender();
-        try (LeaderElection leaderElection = 
leaderElectionService.createLeaderElection()) {
+        try (LeaderElection leaderElection =
+                
leaderElectionService.createLeaderElection("random-contender-id")) {
             leaderElection.startLeaderElection(contender);
 
             contender.awaitGrantLeadership();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index c0047a8a58d..c7d58f3dcd9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -189,7 +189,8 @@ class ZooKeeperLeaderElectionTest {
             for (int i = 0; i < num; i++) {
                 leaderElectionService[i] =
                         
ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient());
-                leaderElections[i] = 
leaderElectionService[i].createLeaderElection();
+                leaderElections[i] =
+                        
leaderElectionService[i].createLeaderElection("random-contender-id");
                 contenders[i] = new TestingContender(createAddress(i), 
leaderElections[i]);
 
                 LOG.debug("Start leader election service for contender #{}.", 
i);
@@ -270,6 +271,7 @@ class ZooKeeperLeaderElectionTest {
         int num = 3;
         int numTries = 30;
 
+        final String contenderID = "random-contender-id";
         DefaultLeaderElectionService[] leaderElectionService =
                 new DefaultLeaderElectionService[num];
         LeaderElection[] leaderElections = new LeaderElection[num];
@@ -287,7 +289,7 @@ class ZooKeeperLeaderElectionTest {
             for (int i = 0; i < num; i++) {
                 leaderElectionService[i] =
                         
ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient());
-                leaderElections[i] = 
leaderElectionService[i].createLeaderElection();
+                leaderElections[i] = 
leaderElectionService[i].createLeaderElection(contenderID);
                 contenders[i] =
                         new TestingContender(LEADER_ADDRESS + "_" + i + "_0", 
leaderElections[i]);
 
@@ -320,7 +322,8 @@ class ZooKeeperLeaderElectionTest {
                     // create new leader election service which takes part in 
the leader election
                     leaderElectionService[index] =
                             
ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient());
-                    leaderElections[index] = 
leaderElectionService[index].createLeaderElection();
+                    leaderElections[index] =
+                            
leaderElectionService[index].createLeaderElection(contenderID);
 
                     contenders[index] =
                             new TestingContender(

Reply via email to