This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 728e679d354 [FLINK-31783][runtime] Migrates 
DefaultLeaderElectionService from LeaderElectionDriver to 
MultipleComponentLeaderElectionDriver
728e679d354 is described below

commit 728e679d354c2c120afe7091f42a91b2e4701036
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Tue Jun 20 15:25:24 2023 +0200

    [FLINK-31783][runtime] Migrates DefaultLeaderElectionService from 
LeaderElectionDriver to MultipleComponentLeaderElectionDriver
    
    Signed-off-by: Matthias Pohl <matthias.p...@aiven.io>
---
 ...netesMultipleComponentLeaderElectionDriver.java |   3 +-
 ...sMultipleComponentLeaderElectionHaServices.java |   5 +-
 .../highavailability/AbstractHaServices.java       |  11 +-
 ...rMultipleComponentLeaderElectionHaServices.java |   5 +-
 .../DefaultLeaderElectionService.java              |  27 +-
 ...aultMultipleComponentLeaderElectionService.java |   9 +-
 .../leaderelection/LeaderInformationRegister.java  |  38 ++
 .../MultipleComponentLeaderElectionDriver.java     |   4 +-
 ...ltipleComponentLeaderElectionDriverAdapter.java |  55 ++-
 ...omponentLeaderElectionDriverAdapterFactory.java |  11 +-
 .../MultipleComponentLeaderElectionService.java    |   4 +-
 ...eeperMultipleComponentLeaderElectionDriver.java |   6 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |  60 ---
 .../highavailability/AbstractHaServicesTest.java   |   5 +-
 .../JobMasterServiceLeadershipRunnerTest.java      |  50 ++-
 .../DefaultLeaderElectionServiceTest.java          | 473 ++++++++++++---------
 ...MultipleComponentLeaderElectionServiceTest.java |  92 ++--
 .../leaderelection/LeaderElectionEvent.java        |  13 +
 .../runtime/leaderelection/LeaderElectionTest.java |   7 +-
 .../TestingGenericLeaderElectionDriver.java        |  94 ----
 .../TestingLeaderElectionDriver.java               | 303 +++++++++----
 .../TestingLeaderElectionEventHandler.java         | 144 -------
 .../TestingLeaderElectionListener.java             |   5 +-
 ...stingMultipleComponentLeaderElectionDriver.java | 117 -----
 ...ltipleComponentLeaderElectionDriverFactory.java |  49 ---
 ...KeeperLeaderElectionConnectionHandlingTest.java |   9 +-
 .../ZooKeeperLeaderElectionTest.java               |  32 +-
 ...rMultipleComponentLeaderElectionDriverTest.java |   2 +-
 28 files changed, 750 insertions(+), 883 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
index 98deaa8cbfa..c448f81656d 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -211,7 +212,7 @@ public class KubernetesMultipleComponentLeaderElectionDriver
     private class LeaderCallbackHandlerImpl extends 
KubernetesLeaderElector.LeaderCallbackHandler {
         @Override
         public void isLeader() {
-            leaderElectionListener.isLeader();
+            leaderElectionListener.isLeader(UUID.randomUUID());
         }
 
         @Override
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
index 9a77040a125..038214dbe86 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.runtime.highavailability.AbstractHaServices;
 import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import 
org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
+import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory;
 import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -105,7 +105,8 @@ public class 
KubernetesMultipleComponentLeaderElectionHaServices extends Abstrac
     }
 
     @Override
-    protected LeaderElectionDriverFactory 
createLeaderElectionDriverFactory(String leaderName) {
+    protected MultipleComponentLeaderElectionDriverFactory 
createLeaderElectionDriverFactory(
+            String leaderName) {
         final MultipleComponentLeaderElectionService 
multipleComponentLeaderElectionService =
                 getOrInitializeSingleLeaderElectionService();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index 401e63cc959..a7b89c46e2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -27,8 +27,8 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.LeaderElection;
-import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -256,14 +256,15 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
     }
 
     /**
-     * Create {@link LeaderElectionDriverFactory} instance for the specified 
leaderName.
+     * Create {@link MultipleComponentLeaderElectionDriverFactory} instance 
for the specified
+     * leaderName.
      *
      * @param leaderName ConfigMap name in Kubernetes or child node path in 
Zookeeper.
-     * @return Return {@code LeaderElectionDriverFactory} used for the {@link
+     * @return Return {@code MultipleComponentLeaderElectionDriverFactory} 
used for the {@link
      *     LeaderElectionService}.
      */
-    protected abstract LeaderElectionDriverFactory 
createLeaderElectionDriverFactory(
-            String leaderName);
+    protected abstract MultipleComponentLeaderElectionDriverFactory
+            createLeaderElectionDriverFactory(String leaderName);
 
     /**
      * Create leader retrieval service with specified leaderName.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
index 901658bad45..bc95589f3bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import 
org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
+import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory;
 import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionService;
 import 
org.apache.flink.runtime.leaderelection.ZooKeeperMultipleComponentLeaderElectionDriverFactory;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -87,7 +87,8 @@ public class 
ZooKeeperMultipleComponentLeaderElectionHaServices
     }
 
     @Override
-    protected LeaderElectionDriverFactory 
createLeaderElectionDriverFactory(String leaderName) {
+    protected MultipleComponentLeaderElectionDriverFactory 
createLeaderElectionDriverFactory(
+            String leaderName) {
         return 
getOrInitializeSingleLeaderElectionService().createDriverFactory(leaderName);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
index 88bf9caa8e3..ddde3a5c819 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
@@ -54,7 +54,7 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
 
     private final Object lock = new Object();
 
-    private final LeaderElectionDriverFactory leaderElectionDriverFactory;
+    private final MultipleComponentLeaderElectionDriverFactory 
leaderElectionDriverFactory;
 
     /**
      * {@code contenderID} being {@code null} indicates that no {@link 
LeaderContender} is
@@ -104,7 +104,7 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
      *
      * <p>The driver is guarded by this instance's {@link #running} state.
      */
-    private LeaderElectionDriver leaderElectionDriver;
+    private MultipleComponentLeaderElectionDriver leaderElectionDriver;
 
     /**
      * This {@link ExecutorService} is used for running the leader event 
handling logic. Production
@@ -117,7 +117,8 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
 
     private final FatalErrorHandler fallbackErrorHandler;
 
-    public DefaultLeaderElectionService(LeaderElectionDriverFactory 
leaderElectionDriverFactory) {
+    public DefaultLeaderElectionService(
+            MultipleComponentLeaderElectionDriverFactory 
leaderElectionDriverFactory) {
         this(
                 leaderElectionDriverFactory,
                 t ->
@@ -127,7 +128,7 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
 
     @VisibleForTesting
     public DefaultLeaderElectionService(
-            LeaderElectionDriverFactory leaderElectionDriverFactory,
+            MultipleComponentLeaderElectionDriverFactory 
leaderElectionDriverFactory,
             FatalErrorHandler fallbackErrorHandler) {
         this(
                 leaderElectionDriverFactory,
@@ -139,7 +140,7 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
 
     @VisibleForTesting
     DefaultLeaderElectionService(
-            LeaderElectionDriverFactory leaderElectionDriverFactory,
+            MultipleComponentLeaderElectionDriverFactory 
leaderElectionDriverFactory,
             FatalErrorHandler fallbackErrorHandler,
             ExecutorService leadershipOperationExecutor) {
         this.leaderElectionDriverFactory = 
checkNotNull(leaderElectionDriverFactory);
@@ -176,8 +177,7 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
             running = true;
 
             leaderElectionDriver =
-                    leaderElectionDriverFactory.createLeaderElectionDriver(
-                            this, new LeaderElectionFatalErrorHandler());
+                    leaderElectionDriverFactory.create(this, new 
LeaderElectionFatalErrorHandler());
 
             LOG.info("Instantiating DefaultLeaderElectionService with {}.", 
leaderElectionDriver);
         }
@@ -235,7 +235,7 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
                         "DefaultLeaderElectionService is stopping while having 
the leadership acquired. The revoke event is forwarded to the 
LeaderContender.");
 
                 if (leaderElectionDriver.hasLeadership()) {
-                    
leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty());
+                    leaderElectionDriver.deleteLeaderInformation(contenderID);
                     LOG.debug("Leader information is cleaned up while 
stopping.");
                 }
             } else {
@@ -303,7 +303,8 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
 
                 confirmedLeaderInformation =
                         LeaderInformation.known(leaderSessionID, 
leaderAddress);
-                
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+                leaderElectionDriver.publishLeaderInformation(
+                        contenderID, confirmedLeaderInformation);
             } else {
                 if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
                     LOG.debug(
@@ -463,13 +464,13 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
                     LOG.debug(
                             "Writing leader information by {} since the 
external storage is empty.",
                             leaderContender.getDescription());
-                    
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
+                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
                 } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
                     // the data field does not correspond to the expected 
leader information
                     LOG.debug(
                             "Correcting leader information by {}.",
                             leaderContender.getDescription());
-                    
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
+                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
                 }
             }
         } else {
@@ -516,8 +517,8 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
     }
 
     @Override
-    public void isLeader() {
-        onGrantLeadership(UUID.randomUUID());
+    public void isLeader(UUID leaderSessionID) {
+        onGrantLeadership(leaderSessionID);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
index 148694e8e78..2775b1c5ce9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
@@ -109,7 +109,7 @@ public class DefaultMultipleComponentLeaderElectionService
     }
 
     @Override
-    public LeaderElectionDriverFactory createDriverFactory(String componentId) 
{
+    public MultipleComponentLeaderElectionDriverFactory 
createDriverFactory(String componentId) {
         return new 
MultipleComponentLeaderElectionDriverAdapterFactory(componentId, this);
     }
 
@@ -169,8 +169,7 @@ public class DefaultMultipleComponentLeaderElectionService
     }
 
     @Override
-    public void isLeader() {
-        final UUID newLeaderSessionId = UUID.randomUUID();
+    public void isLeader(UUID leaderSessionID) {
         synchronized (lock) {
             if (!running) {
                 return;
@@ -179,11 +178,11 @@ public class DefaultMultipleComponentLeaderElectionService
             Preconditions.checkState(
                     currentLeaderSessionId == null,
                     "notLeader() wasn't called by the LeaderElection backend 
before assigning leadership to this LeaderElectionService.");
-            currentLeaderSessionId = newLeaderSessionId;
+            currentLeaderSessionId = leaderSessionID;
 
             forEachLeaderElectionEventHandler(
                     leaderElectionEventHandler ->
-                            
leaderElectionEventHandler.onGrantLeadership(newLeaderSessionId));
+                            
leaderElectionEventHandler.onGrantLeadership(leaderSessionID));
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformationRegister.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformationRegister.java
index aa9b9a6e0c5..d9ac261fbae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformationRegister.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformationRegister.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -28,14 +31,49 @@ import java.util.Optional;
  */
 public class LeaderInformationRegister {
 
+    private static final LeaderInformationRegister EMPTY_REGISTER =
+            new LeaderInformationRegister(Collections.emptyMap());
+
     private final Map<String, LeaderInformation> 
leaderInformationPerContenderID;
 
+    public static LeaderInformationRegister empty() {
+        return EMPTY_REGISTER;
+    }
+
     public static LeaderInformationRegister of(
             String contenderID, LeaderInformation leaderInformation) {
         return new LeaderInformationRegister(
                 Collections.singletonMap(contenderID, leaderInformation));
     }
 
+    public static LeaderInformationRegister merge(
+            @Nullable LeaderInformationRegister leaderInformationRegister,
+            String contenderID,
+            LeaderInformation leaderInformation) {
+        final Map<String, LeaderInformation> existingLeaderInformation =
+                new HashMap<>(
+                        leaderInformationRegister == null
+                                ? Collections.emptyMap()
+                                : 
leaderInformationRegister.leaderInformationPerContenderID);
+        if (leaderInformation.isEmpty()) {
+            existingLeaderInformation.remove(contenderID);
+        } else {
+            existingLeaderInformation.put(contenderID, leaderInformation);
+        }
+
+        return new LeaderInformationRegister(existingLeaderInformation);
+    }
+
+    public static LeaderInformationRegister clear(
+            @Nullable LeaderInformationRegister leaderInformationRegister, 
String contenderID) {
+        if (leaderInformationRegister == null
+                || 
!leaderInformationRegister.getRegisteredContenderIDs().iterator().hasNext()) {
+            return LeaderInformationRegister.empty();
+        }
+
+        return merge(leaderInformationRegister, contenderID, 
LeaderInformation.empty());
+    }
+
     public LeaderInformationRegister(
             Map<String, LeaderInformation> leaderInformationPerContenderID) {
         this.leaderInformationPerContenderID = leaderInformationPerContenderID;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
index 9c80b5ef24b..c27554483ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import java.util.UUID;
+
 /**
  * A leader election driver that allows to write {@link LeaderInformation} for 
multiple components.
  */
@@ -51,7 +53,7 @@ public interface MultipleComponentLeaderElectionDriver 
extends AutoCloseable {
     interface Listener {
 
         /** Callback that is called once the driver obtains the leadership. */
-        void isLeader();
+        void isLeader(UUID leaderSessionID);
 
         /** Callback that is called once the driver loses the leadership. */
         void notLeader();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapter.java
index c866609dfb7..66f049aa6e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapter.java
@@ -20,14 +20,29 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 /**
  * {@link LeaderElectionDriver} adapter that multiplexes the leader election 
of a component into a
  * single leader election via {@link MultipleComponentLeaderElectionService}.
  */
-final class MultipleComponentLeaderElectionDriverAdapter implements 
LeaderElectionDriver {
+final class MultipleComponentLeaderElectionDriverAdapter
+        implements LeaderElectionDriver, MultipleComponentLeaderElectionDriver 
{
     private final String componentId;
     private final MultipleComponentLeaderElectionService 
multipleComponentLeaderElectionService;
 
+    MultipleComponentLeaderElectionDriverAdapter(
+            String componentID,
+            MultipleComponentLeaderElectionService 
multipleComponentLeaderElectionService,
+            MultipleComponentLeaderElectionDriver.Listener listener) {
+        this(
+                componentID,
+                multipleComponentLeaderElectionService,
+                // here's where the componentID of the legacy code would be 
translated into
+                // the contenderID that will be used in the 
DefaultLeaderElectionService eventually
+                new ListenerWrapper("unused-contender-id", listener));
+    }
+
     MultipleComponentLeaderElectionDriverAdapter(
             String componentId,
             MultipleComponentLeaderElectionService 
multipleComponentLeaderElectionService,
@@ -51,8 +66,46 @@ final class MultipleComponentLeaderElectionDriverAdapter 
implements LeaderElecti
         return 
multipleComponentLeaderElectionService.hasLeadership(componentId);
     }
 
+    @Override
+    public void publishLeaderInformation(
+            String ignoredContenderID, LeaderInformation leaderInformation) {
+        writeLeaderInformation(leaderInformation);
+    }
+
+    @Override
+    public void deleteLeaderInformation(String ignoredContenderID) {
+        writeLeaderInformation(LeaderInformation.empty());
+    }
+
     @Override
     public void close() throws Exception {
         
multipleComponentLeaderElectionService.unregisterLeaderElectionEventHandler(componentId);
     }
+
+    private static class ListenerWrapper implements LeaderElectionEventHandler 
{
+
+        private final String contenderID;
+        private final MultipleComponentLeaderElectionDriver.Listener listener;
+
+        public ListenerWrapper(
+                String contenderID, 
MultipleComponentLeaderElectionDriver.Listener listener) {
+            this.contenderID = contenderID;
+            this.listener = listener;
+        }
+
+        @Override
+        public void onGrantLeadership(UUID newLeaderSessionId) {
+            listener.isLeader(newLeaderSessionId);
+        }
+
+        @Override
+        public void onRevokeLeadership() {
+            listener.notLeader();
+        }
+
+        @Override
+        public void onLeaderInformationChange(LeaderInformation 
leaderInformation) {
+            listener.notifyLeaderInformationChange(contenderID, 
leaderInformation);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapterFactory.java
index bf62709502d..5039b90bc52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapterFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverAdapterFactory.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Preconditions;
 
 /** Factory for a {@link MultipleComponentLeaderElectionDriverAdapter}. */
 final class MultipleComponentLeaderElectionDriverAdapterFactory
-        implements LeaderElectionDriverFactory {
+        implements LeaderElectionDriverFactory, 
MultipleComponentLeaderElectionDriverFactory {
     private final String leaderName;
     private final MultipleComponentLeaderElectionService 
singleLeaderElectionService;
 
@@ -40,4 +40,13 @@ final class 
MultipleComponentLeaderElectionDriverAdapterFactory
         return new MultipleComponentLeaderElectionDriverAdapter(
                 leaderName, singleLeaderElectionService, leaderEventHandler);
     }
+
+    @Override
+    public MultipleComponentLeaderElectionDriver create(
+            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
+            FatalErrorHandler fatalErrorHandler)
+            throws Exception {
+        return new MultipleComponentLeaderElectionDriverAdapter(
+                leaderName, singleLeaderElectionService, 
leaderElectionListener);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionService.java
index 0ce739ebf3d..1e3816538d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionService.java
@@ -33,13 +33,13 @@ public interface MultipleComponentLeaderElectionService {
     void close() throws Exception;
 
     /**
-     * Creates a {@link LeaderElectionDriverFactory} for the given leader name.
+     * Creates a {@link MultipleComponentLeaderElectionDriverFactory} for the 
given leader name.
      *
      * @param componentId identifying the component for which to create a 
leader election driver
      *     factory
      * @return Leader election driver factory
      */
-    LeaderElectionDriverFactory createDriverFactory(String componentId);
+    MultipleComponentLeaderElectionDriverFactory createDriverFactory(String 
componentId);
 
     /**
      * Publishes the given leader information for the component identified by 
the given leader name.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
index e6cc17719b3..56a61ea544c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /** ZooKeeper based {@link MultipleComponentLeaderElectionDriver} 
implementation. */
@@ -195,8 +196,9 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver
 
     @Override
     public void isLeader() {
-        LOG.debug("{} obtained the leadership.", this);
-        leaderElectionListener.isLeader();
+        final UUID leaderSessionID = UUID.randomUUID();
+        LOG.debug("{} obtained the leadership with session ID {}.", this, 
leaderSessionID);
+        leaderElectionListener.isLeader(leaderSessionID);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index b1e3917144d..52a00dc8b14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -40,11 +40,7 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreUtil;
 import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreWatcher;
-import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
-import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
-import 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory;
 import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriverFactory;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
@@ -408,62 +404,6 @@ public class ZooKeeperUtils {
                 client, path, leaderInformationClearancePolicy);
     }
 
-    /**
-     * Creates a {@link DefaultLeaderElectionService} instance with {@link
-     * ZooKeeperLeaderElectionDriver}.
-     *
-     * @param client The {@link CuratorFramework} ZooKeeper client to use
-     * @return {@link DefaultLeaderElectionService} instance.
-     */
-    @Deprecated
-    public static DefaultLeaderElectionService 
createLeaderElectionService(CuratorFramework client)
-            throws Exception {
-
-        return createLeaderElectionService(client, "");
-    }
-
-    /**
-     * Creates a {@link DefaultLeaderElectionService} instance with {@link
-     * ZooKeeperLeaderElectionDriver}.
-     *
-     * @param client The {@link CuratorFramework} ZooKeeper client to use
-     * @param path The path for the leader election
-     * @return {@link DefaultLeaderElectionService} instance.
-     */
-    @Deprecated
-    public static DefaultLeaderElectionService createLeaderElectionService(
-            final CuratorFramework client, final String path) throws Exception 
{
-        final DefaultLeaderElectionService leaderElectionService =
-                new 
DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
-        leaderElectionService.startLeaderElectionBackend();
-
-        return leaderElectionService;
-    }
-
-    /**
-     * Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper.
-     *
-     * @param client The {@link CuratorFramework} ZooKeeper client to use
-     * @return {@link LeaderElectionDriverFactory} instance.
-     */
-    @Deprecated
-    public static ZooKeeperLeaderElectionDriverFactory 
createLeaderElectionDriverFactory(
-            final CuratorFramework client) {
-        return createLeaderElectionDriverFactory(client, "");
-    }
-
-    /**
-     * Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper.
-     *
-     * @param client The {@link CuratorFramework} ZooKeeper client to use
-     * @param path The path suffix which we want to append
-     * @return {@link LeaderElectionDriverFactory} instance.
-     */
-    public static ZooKeeperLeaderElectionDriverFactory 
createLeaderElectionDriverFactory(
-            final CuratorFramework client, final String path) {
-        return new ZooKeeperLeaderElectionDriverFactory(client, path);
-    }
-
     public static void writeLeaderInformationToZooKeeper(
             LeaderInformation leaderInformation,
             CuratorFramework curatorFramework,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index ecdb21603f4..43a5ae6dc67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
-import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
+import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.util.FlinkException;
@@ -201,7 +201,8 @@ class AbstractHaServicesTest {
         }
 
         @Override
-        protected LeaderElectionDriverFactory 
createLeaderElectionDriverFactory(String leaderName) {
+        protected MultipleComponentLeaderElectionDriverFactory 
createLeaderElectionDriverFactory(
+                String leaderName) {
             throw new UnsupportedOperationException("Not supported by this 
test implementation.");
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index 4d06cc9af17..ad27a1aebd1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.LeaderElection;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
+import org.apache.flink.runtime.leaderelection.LeaderInformationRegister;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -55,6 +56,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
@@ -67,6 +69,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
@@ -673,20 +676,27 @@ class JobMasterServiceLeadershipRunnerTest {
         }
     }
 
+    @Disabled
     @Test
     void 
testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip()
             throws Exception {
-        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
-                testingLeaderElectionDriverFactory =
-                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+        final AtomicReference<LeaderInformationRegister> 
storedLeaderInformation =
+                new AtomicReference<>(LeaderInformationRegister.empty());
+
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(
+                                new AtomicBoolean(), storedLeaderInformation, 
new AtomicBoolean()));
 
         // we need to use DefaultLeaderElectionService here because 
JobMasterServiceLeadershipRunner
         // in connection with the DefaultLeaderElectionService generates the 
nested locking
         final DefaultLeaderElectionService defaultLeaderElectionService =
-                new DefaultLeaderElectionService(
-                        testingLeaderElectionDriverFactory, fatalErrorHandler);
+                new DefaultLeaderElectionService(driverFactory, 
fatalErrorHandler);
         defaultLeaderElectionService.startLeaderElectionBackend();
 
+        final TestingLeaderElectionDriver currentLeaderDriver =
+                driverFactory.assertAndGetOnlyCreatedDriver();
+
         // latch to detect when we reached the first synchronized section 
having a lock on the
         // JobMasterServiceProcess#stop side
         final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
@@ -710,6 +720,9 @@ class JobMasterServiceLeadershipRunnerTest {
                                     return 
CompletableFuture.completedFuture(null);
                                 })
                         .build();
+        final String contenderID = "random-contender-id";
+        final LeaderElection leaderElection =
+                defaultLeaderElectionService.createLeaderElection(contenderID);
         try (final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .setClassLoaderLease(
@@ -737,7 +750,7 @@ class JobMasterServiceLeadershipRunnerTest {
                                                         // 
DefaultLeaderElectionService#lock in
                                                         // 
DefaultLeaderElectionService#onGrantLeadership,
                                                         // but we trigger this 
implicitly through
-                                                        // 
TestingLeaderElectionDriver#isLeader().
+                                                        // 
TestingLeaderElectionDriver#grantLeadership(UUID).
                                                         // Adding a short 
sleep can ensure that
                                                         // another thread 
successfully receives the
                                                         // leadership 
notification, so that the
@@ -753,25 +766,16 @@ class JobMasterServiceLeadershipRunnerTest {
                                         .setJobMasterServiceProcessFunction(
                                                 ignoredSessionId -> 
jobMasterServiceProcess)
                                         .build())
-                        .setLeaderElection(
-                                
defaultLeaderElectionService.createLeaderElection(
-                                        "random-contender-id"))
+                        .setLeaderElection(leaderElection)
                         .build()) {
             jobManagerRunner.start();
 
-            final TestingLeaderElectionDriver currentLeaderDriver =
-                    Preconditions.checkNotNull(
-                            
testingLeaderElectionDriverFactory.getCurrentLeaderDriver());
             // grant leadership to create jobMasterServiceProcess
-            currentLeaderDriver.isLeader();
-
-            while 
(currentLeaderDriver.getLeaderInformation().getLeaderSessionID() == null
-                    || !jobManagerRunner
-                            .getLeaderElection()
-                            .hasLeadership(
-                                    currentLeaderDriver
-                                            .getLeaderInformation()
-                                            .getLeaderSessionID())) {
+            final UUID leaderSessionID = UUID.randomUUID();
+            defaultLeaderElectionService.isLeader(leaderSessionID);
+
+            while (!currentLeaderDriver.hasLeadership()
+                    || !leaderElection.hasLeadership(leaderSessionID)) {
                 Thread.sleep(100);
             }
 
@@ -788,8 +792,8 @@ class JobMasterServiceLeadershipRunnerTest {
                                 // order (i.e. no two grant or revoke events 
should appear after
                                 // each other). This requires the leadership 
to be revoked before
                                 // regaining leadership in this test.
-                                currentLeaderDriver.notLeader();
-                                currentLeaderDriver.isLeader();
+                                defaultLeaderElectionService.notLeader();
+                                
defaultLeaderElectionService.isLeader(UUID.randomUUID());
                             });
             grantLeadershipThread.start();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
index 559879c823e..0006924f16f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -33,7 +32,9 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.function.Consumer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -50,43 +51,51 @@ class DefaultLeaderElectionServiceTest {
 
     @Test
     void testOnGrantAndRevokeLeadership() throws Exception {
-        new Context() {
+        final AtomicReference<LeaderInformationRegister> 
storedLeaderInformation =
+                new AtomicReference<>(LeaderInformationRegister.empty());
+        new Context(storedLeaderInformation) {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
                             // grant leadership
-                            testingLeaderElectionDriver.isLeader();
+                            final UUID leaderSessionID = UUID.randomUUID();
+                            grantLeadership(leaderSessionID);
 
                             testingContender.waitForLeader();
                             
assertThat(testingContender.getDescription()).isEqualTo(TEST_URL);
                             assertThat(testingContender.getLeaderSessionID())
                                     .isEqualTo(
-                                            
leaderElectionService.getLeaderSessionID(contenderID));
+                                            
leaderElectionService.getLeaderSessionID(contenderID))
+                                    .isEqualTo(leaderSessionID);
 
                             final LeaderInformation 
expectedLeaderInformationInHaBackend =
                                     LeaderInformation.known(
                                             
leaderElectionService.getLeaderSessionID(contenderID),
                                             TEST_URL);
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation())
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
                                     .as(
                                             "The HA backend should have its 
leader information updated.")
-                                    
.isEqualTo(expectedLeaderInformationInHaBackend);
+                                    
.hasValue(expectedLeaderInformationInHaBackend);
+
+                            revokeLeadership();
 
-                            // revoke leadership
-                            testingLeaderElectionDriver.notLeader();
                             testingContender.waitForRevokeLeader();
                             
assertThat(testingContender.getLeaderSessionID()).isNull();
                             
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
                                     .isNull();
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation())
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
                                     .as(
                                             "External storage is not touched 
by the leader session because the leadership is already lost.")
-                                    
.isEqualTo(expectedLeaderInformationInHaBackend);
+                                    
.hasValue(expectedLeaderInformationInHaBackend);
                         });
             }
         };
     }
 
+    /**
+     * Tests that we can shut down the DefaultLeaderElectionService if the 
used LeaderElectionDriver
+     * holds an internal lock. See FLINK-20008 for more details.
+     */
     @Test
     void testCloseGrantDeadlock() throws Exception {
         final OneShotLatch closeReachedLatch = new OneShotLatch();
@@ -94,28 +103,30 @@ class DefaultLeaderElectionServiceTest {
         final OneShotLatch grantReachedLatch = new OneShotLatch();
         final OneShotLatch grantContinueLatch = new OneShotLatch();
 
-        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
-                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
-                        eventHandler -> {},
-                        eventHandler -> {
-                            closeReachedLatch.trigger();
-                            closeContinueLatch.await();
-                        },
-                        leaderElectionEventHandler -> {
-                            grantReachedLatch.trigger();
-                            grantContinueLatch.awaitQuietly();
-                        });
+        final CompletableFuture<Void> driverCloseTriggered = new 
CompletableFuture<>();
+
+        final AtomicBoolean leadershipGranted = new AtomicBoolean();
+        final TestingLeaderElectionDriver.Builder driverBuilder =
+                TestingLeaderElectionDriver.newBuilder(leadershipGranted)
+                        .setCloseConsumer(
+                                lock -> {
+                                    closeReachedLatch.trigger();
+                                    closeContinueLatch.await();
+                                    try {
+                                        lock.lock();
+                                        driverCloseTriggered.complete(null);
+                                    } finally {
+                                        lock.unlock();
+                                    }
+                                });
 
-        final ManuallyTriggeredScheduledExecutorService executorService =
-                new ManuallyTriggeredScheduledExecutorService();
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(driverBuilder);
         final DefaultLeaderElectionService testInstance =
                 new DefaultLeaderElectionService(
-                        driverFactory,
-                        
fatalErrorHandlerExtension.getTestingFatalErrorHandler(),
-                        executorService);
+                        driverFactory, 
fatalErrorHandlerExtension.getTestingFatalErrorHandler());
         testInstance.startLeaderElectionBackend();
-        final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
-        assertThat(driver).isNotNull();
+        final TestingLeaderElectionDriver driver = 
driverFactory.assertAndGetOnlyCreatedDriver();
 
         final Thread closeThread =
                 new Thread(
@@ -132,7 +143,26 @@ class DefaultLeaderElectionServiceTest {
         closeThread.start();
         closeReachedLatch.await();
 
-        final Thread grantThread = new Thread(driver::isLeader, "GrantThread");
+        final Thread grantThread =
+                new Thread(
+                        () -> {
+                            try {
+                                // simulates the grant process being triggered 
from the HA backend's
+                                // side where the same lock that is acquired 
during the driver's
+                                // process is also acquired while handling a 
leadership event
+                                // processing
+                                driver.getLock().lock();
+                                grantReachedLatch.trigger();
+                                grantContinueLatch.awaitQuietly();
+
+                                // grants leadership
+                                leadershipGranted.set(true);
+                                testInstance.isLeader(UUID.randomUUID());
+                            } finally {
+                                driver.getLock().unlock();
+                            }
+                        },
+                        "GrantThread");
 
         // triggers the service acquiring the leadership and, as a 
consequence, acquiring the
         // driver's lock
@@ -145,28 +175,22 @@ class DefaultLeaderElectionServiceTest {
 
         closeThread.join();
         grantThread.join();
+
+        
FlinkAssertions.assertThatFuture(driverCloseTriggered).eventuallySucceeds();
     }
 
-    /**
-     * With {@link MultipleComponentLeaderElectionDriverAdapter} and {@link
-     * DefaultMultipleComponentLeaderElectionService} it happens that {@link
-     * LeaderElectionEventHandler#onGrantLeadership(UUID)} happens while 
instantiating the {@link
-     * LeaderElectionDriver} (i.e. {@code 
MultipleComponentLeaderElectionDriverAdapter}). This test
-     * verifies that the grant event is handled properly.
-     */
     @Test
     void testGrantCallWhileInstantiatingDriver() throws Exception {
         final UUID expectedLeaderSessionID = UUID.randomUUID();
-        try (final TestingGenericLeaderElectionDriver driver =
-                        
TestingGenericLeaderElectionDriver.newBuilder().build();
-                final DefaultLeaderElectionService testInstance =
-                        new DefaultLeaderElectionService(
-                                (eventHandler, errorHandler) -> {
-                                    
eventHandler.onGrantLeadership(expectedLeaderSessionID);
-                                    return driver;
-                                },
-                                
fatalErrorHandlerExtension.getTestingFatalErrorHandler(),
-                                Executors.newDirectExecutorService())) {
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(
+                        (listener, errorHandler) -> {
+                            listener.isLeader(expectedLeaderSessionID);
+                            return TestingLeaderElectionDriver.newNoOpBuilder()
+                                    .build(listener, errorHandler);
+                        },
+                        
fatalErrorHandlerExtension.getTestingFatalErrorHandler(),
+                        Executors.newDirectExecutorService())) {
             testInstance.startLeaderElectionBackend();
 
             final LeaderElection leaderElection =
@@ -192,7 +216,7 @@ class DefaultLeaderElectionServiceTest {
                             leaderElection.close();
 
                             final UUID expectedSessionID = UUID.randomUUID();
-                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            grantLeadership(expectedSessionID);
 
                             try (LeaderElection anotherLeaderElection =
                                     
leaderElectionService.createLeaderElection(contenderID)) {
@@ -229,8 +253,7 @@ class DefaultLeaderElectionServiceTest {
                             // was already registered to the service
                             leaderElection.close();
 
-                            final UUID expectedSessionID = UUID.randomUUID();
-                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            grantLeadership();
                             executorService.trigger();
 
                             leaderElection =
@@ -253,19 +276,20 @@ class DefaultLeaderElectionServiceTest {
      */
     @Test
     void testOnRevokeCallWhileClosingService() throws Exception {
-        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
-                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
-                        LeaderElectionEventHandler::onRevokeLeadership);
+        final AtomicBoolean leadershipGranted = new AtomicBoolean();
+        final TestingLeaderElectionDriver.Builder driverBuilder =
+                TestingLeaderElectionDriver.newBuilder(leadershipGranted);
 
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(driverBuilder);
         try (final DefaultLeaderElectionService testInstance =
                 new DefaultLeaderElectionService(
                         driverFactory, 
fatalErrorHandlerExtension.getTestingFatalErrorHandler())) {
+            driverBuilder.setCloseConsumer(lock -> 
testInstance.onRevokeLeadership());
             testInstance.startLeaderElectionBackend();
 
-            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
-            assertThat(driver).isNotNull();
-
-            driver.isLeader();
+            leadershipGranted.set(true);
+            testInstance.isLeader(UUID.randomUUID());
 
             final LeaderElection leaderElection =
                     
testInstance.createLeaderElection(createRandomContenderID());
@@ -281,34 +305,11 @@ class DefaultLeaderElectionServiceTest {
         }
     }
 
-    @Test
-    void testStopWhileHavingLeadership() throws Exception {
-        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
-                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
-
-        try (final DefaultLeaderElectionService testInstance =
-                new DefaultLeaderElectionService(
-                        driverFactory, 
fatalErrorHandlerExtension.getTestingFatalErrorHandler())) {
-            testInstance.startLeaderElectionBackend();
-
-            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
-            assertThat(driver).isNotNull();
-
-            driver.isLeader();
-
-            final LeaderElection leaderElection =
-                    
testInstance.createLeaderElection(createRandomContenderID());
-            
leaderElection.startLeaderElection(TestingGenericLeaderContender.newBuilder().build());
-
-            leaderElection.close();
-        }
-    }
-
     @Test
     void testContenderRegistrationWithoutDriverBeingInstantiatedFails() throws 
Exception {
         try (final DefaultLeaderElectionService leaderElectionService =
                 new DefaultLeaderElectionService(
-                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(),
+                        
TestingLeaderElectionDriver.Factory.createFactoryWithNoOpDriver(),
                         
fatalErrorHandlerExtension.getTestingFatalErrorHandler())) {
             final LeaderElection leaderElection =
                     
leaderElectionService.createLeaderElection(createRandomContenderID());
@@ -339,18 +340,20 @@ class DefaultLeaderElectionServiceTest {
 
     @Test
     void testProperCleanupOnLeaderElectionCloseWhenHoldingTheLeadership() 
throws Exception {
-        new Context() {
+        final AtomicReference<LeaderInformationRegister> 
storedLeaderInformation =
+                new AtomicReference<>();
+        new Context(storedLeaderInformation) {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            testingLeaderElectionDriver.isLeader();
-                            testingContender.waitForLeader();
+                            final UUID leaderSessionID = UUID.randomUUID();
+                            grantLeadership(leaderSessionID);
 
                             
assertThat(testingContender.getLeaderSessionID()).isNotNull();
                             
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
-                                    .isNotNull();
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation().isEmpty())
-                                    .isFalse();
+                                    .isEqualTo(leaderSessionID);
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
+                                    
.hasValue(LeaderInformation.known(leaderSessionID, TEST_URL));
 
                             leaderElection.close();
 
@@ -362,9 +365,9 @@ class DefaultLeaderElectionServiceTest {
                                     .as(
                                             "The LeaderElectionService should 
have its internal state cleaned.")
                                     .isNull();
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation())
+                            
assertThat(storedLeaderInformation.get().getRegisteredContenderIDs())
                                     .as("The HA backend's data should have 
been cleaned.")
-                                    .isEqualTo(LeaderInformation.empty());
+                                    .isEmpty();
                         });
             }
         };
@@ -372,28 +375,37 @@ class DefaultLeaderElectionServiceTest {
 
     @Test
     void testLeaderInformationChangedAndShouldBeCorrected() throws Exception {
-        new Context() {
+        final AtomicReference<LeaderInformationRegister> 
storedLeaderInformation =
+                new AtomicReference<>();
+        new Context(storedLeaderInformation) {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            testingLeaderElectionDriver.isLeader();
+                            final UUID leaderSessionID = UUID.randomUUID();
+                            grantLeadership(leaderSessionID);
 
-                            final LeaderInformation expectedLeader =
-                                    LeaderInformation.known(
-                                            
leaderElectionService.getLeaderSessionID(contenderID),
-                                            TEST_URL);
+                            final LeaderInformation expectedLeaderInformation =
+                                    LeaderInformation.known(leaderSessionID, 
TEST_URL);
 
                             // Leader information changed on external storage. 
It should be
                             // corrected.
-                            
testingLeaderElectionDriver.leaderInformationChanged(
-                                    LeaderInformation.empty());
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation())
-                                    .isEqualTo(expectedLeader);
-
-                            
testingLeaderElectionDriver.leaderInformationChanged(
-                                    LeaderInformation.known(UUID.randomUUID(), 
"faulty-address"));
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation())
-                                    .isEqualTo(expectedLeader);
+                            
storedLeaderInformation.set(LeaderInformationRegister.empty());
+                            
leaderElectionService.notifyLeaderInformationChange(
+                                    contenderID, LeaderInformation.empty());
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
+                                    .as("Removed leader information should 
have been reset.")
+                                    .hasValue(expectedLeaderInformation);
+
+                            final LeaderInformation faultyLeaderInformation =
+                                    LeaderInformation.known(UUID.randomUUID(), 
"faulty-address");
+                            storedLeaderInformation.set(
+                                    LeaderInformationRegister.of(
+                                            contenderID, 
faultyLeaderInformation));
+                            
leaderElectionService.notifyLeaderInformationChange(
+                                    contenderID, faultyLeaderInformation);
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
+                                    .as("Overwritten leader information should 
have been reset.")
+                                    .hasValue(expectedLeaderInformation);
                         });
             }
         };
@@ -406,8 +418,7 @@ class DefaultLeaderElectionServiceTest {
                 runTestWithManuallyTriggeredEvents(
                         executorService -> {
                             final UUID expectedSessionID = UUID.randomUUID();
-
-                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            grantLeadership(expectedSessionID);
 
                             assertThat(
                                             
leaderElectionService.hasLeadership(
@@ -429,10 +440,15 @@ class DefaultLeaderElectionServiceTest {
                 runTestWithManuallyTriggeredEvents(
                         executorService -> {
                             final UUID expectedSessionID = UUID.randomUUID();
+                            grantLeadership(expectedSessionID);
+
+                            
assertThat(testingContender.getLeaderSessionID()).isNull();
 
-                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
                             executorService.trigger();
 
+                            assertThat(testingContender.getLeaderSessionID())
+                                    .isEqualTo(expectedSessionID);
+
                             assertThat(
                                             
leaderElectionService.hasLeadership(
                                                     contenderID, 
expectedSessionID))
@@ -452,11 +468,11 @@ class DefaultLeaderElectionServiceTest {
             {
                 runTestWithManuallyTriggeredEvents(
                         executorService -> {
-                            final UUID expectedSessionID = UUID.randomUUID();
-
-                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            grantLeadership();
                             executorService.trigger();
-                            testingLeaderElectionDriver.notLeader();
+
+                            final UUID expectedSessionID = 
testingContender.getLeaderSessionID();
+                            revokeLeadership();
 
                             assertThat(
                                             
leaderElectionService.hasLeadership(
@@ -483,12 +499,12 @@ class DefaultLeaderElectionServiceTest {
             {
                 runTestWithManuallyTriggeredEvents(
                         executorService -> {
-                            final UUID expectedSessionID = UUID.randomUUID();
-
-                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            grantLeadership();
                             executorService.trigger();
 
-                            testingLeaderElectionDriver.notLeader();
+                            final UUID expectedSessionID = 
testingContender.getLeaderSessionID();
+
+                            revokeLeadership();
                             executorService.trigger();
 
                             assertThat(
@@ -510,10 +526,10 @@ class DefaultLeaderElectionServiceTest {
             {
                 runTestWithManuallyTriggeredEvents(
                         executorService -> {
-                            final UUID expectedSessionID = UUID.randomUUID();
-                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            grantLeadership();
                             executorService.trigger();
 
+                            final UUID expectedSessionID = 
testingContender.getLeaderSessionID();
                             leaderElection.close();
 
                             assertThat(
@@ -527,16 +543,23 @@ class DefaultLeaderElectionServiceTest {
 
     @Test
     void testLeaderInformationChangedIfNotBeingLeader() throws Exception {
-        new Context() {
+        final AtomicReference<LeaderInformationRegister> 
storedLeaderInformation =
+                new AtomicReference<>();
+        new Context(storedLeaderInformation) {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            final LeaderInformation faultyLeader =
-                                    LeaderInformation.known(UUID.randomUUID(), 
"faulty-address");
-                            
testingLeaderElectionDriver.leaderInformationChanged(faultyLeader);
-                            // External storage should keep the wrong value.
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation())
-                                    .isEqualTo(faultyLeader);
+                            final LeaderInformation differentLeaderInformation 
=
+                                    LeaderInformation.known(UUID.randomUUID(), 
"different-address");
+                            storedLeaderInformation.set(
+                                    LeaderInformationRegister.of(
+                                            contenderID, 
differentLeaderInformation));
+                            
leaderElectionService.notifyLeaderInformationChange(
+                                    contenderID, differentLeaderInformation);
+
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
+                                    .as("The external storage shouldn't have 
been changed.")
+                                    .hasValue(differentLeaderInformation);
                         });
             }
         };
@@ -549,14 +572,16 @@ class DefaultLeaderElectionServiceTest {
                 runTestWithSynchronousEventHandling(
                         () -> {
                             leaderElection.close();
-                            testingLeaderElectionDriver.isLeader();
+                            grantLeadership();
 
                             
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
                                     .as(
                                             "The grant event shouldn't have 
been processed by the LeaderElectionService.")
                                     .isNull();
-                            // leader contender is not granted leadership
-                            
assertThat(testingContender.getLeaderSessionID()).isNull();
+                            assertThat(testingContender.getLeaderSessionID())
+                                    .as(
+                                            "The grant event shouldn't have 
been forwarded to the contender.")
+                                    .isNull();
                         });
             }
         };
@@ -564,19 +589,26 @@ class DefaultLeaderElectionServiceTest {
 
     @Test
     void testOnLeaderInformationChangeIsIgnoredAfterLeaderElectionBeingStop() 
throws Exception {
-        new Context() {
+        final AtomicReference<LeaderInformationRegister> 
storedLeaderInformation =
+                new AtomicReference<>();
+        new Context(storedLeaderInformation) {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            testingLeaderElectionDriver.isLeader();
+                            grantLeadership();
+
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
+                                    .isPresent();
 
                             leaderElection.close();
-                            
testingLeaderElectionDriver.leaderInformationChanged(
-                                    LeaderInformation.empty());
 
-                            // External storage should not be corrected
-                            
assertThat(testingLeaderElectionDriver.getLeaderInformation())
-                                    .isEqualTo(LeaderInformation.empty());
+                            
storedLeaderInformation.set(LeaderInformationRegister.empty());
+                            
leaderElectionService.notifyLeaderInformationChange(
+                                    contenderID, LeaderInformation.empty());
+
+                            
assertThat(storedLeaderInformation.get().getRegisteredContenderIDs())
+                                    .as("The external storage shouldn't have 
been corrected.")
+                                    .isEmpty();
                         });
             }
         };
@@ -588,7 +620,7 @@ class DefaultLeaderElectionServiceTest {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            testingLeaderElectionDriver.isLeader();
+                            grantLeadership();
                             final UUID oldSessionId =
                                     
leaderElectionService.getLeaderSessionID(contenderID);
                             assertThat(testingContender.getLeaderSessionID())
@@ -607,19 +639,29 @@ class DefaultLeaderElectionServiceTest {
 
     @Test
     void testOldConfirmLeaderInformationWhileHavingNewLeadership() throws 
Exception {
-        new Context() {
+        final AtomicReference<LeaderInformationRegister> 
storedLeaderInformation =
+                new AtomicReference<>();
+        new Context(storedLeaderInformation) {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            testingLeaderElectionDriver.isLeader();
+                            grantLeadership();
                             final UUID currentLeaderSessionId =
                                     
leaderElectionService.getLeaderSessionID(contenderID);
                             assertThat(currentLeaderSessionId).isNotNull();
+                            final LeaderInformation expectedLeaderInformation =
+                                    
LeaderInformation.known(currentLeaderSessionId, TEST_URL);
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
+                                    .hasValue(expectedLeaderInformation);
 
                             // Old confirm call should be ignored.
                             
leaderElection.confirmLeadership(UUID.randomUUID(), TEST_URL);
                             
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
                                     .isEqualTo(currentLeaderSessionId);
+                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
+                                    .as(
+                                            "The leader information in the 
external storage shouldn't have been updated.")
+                                    .hasValue(expectedLeaderInformation);
                         });
             }
         };
@@ -631,12 +673,12 @@ class DefaultLeaderElectionServiceTest {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            testingLeaderElectionDriver.isLeader();
+                            grantLeadership();
                             final UUID currentLeaderSessionId =
                                     
leaderElectionService.getLeaderSessionID(contenderID);
                             assertThat(currentLeaderSessionId).isNotNull();
 
-                            testingLeaderElectionDriver.notLeader();
+                            revokeLeadership();
 
                             // Old confirm call should be ignored.
                             
leaderElection.confirmLeadership(currentLeaderSessionId, TEST_URL);
@@ -656,9 +698,8 @@ class DefaultLeaderElectionServiceTest {
                         () -> {
                             final Exception testException = new 
Exception("test leader exception");
 
-                            
testingLeaderElectionDriver.onFatalError(testException);
+                            
testingLeaderElectionDriver.triggerFatalError(testException);
 
-                            testingContender.waitForError();
                             assertThat(testingContender.getError())
                                     .isNotNull()
                                     .hasCause(testException);
@@ -679,7 +720,7 @@ class DefaultLeaderElectionServiceTest {
 
                             leaderElection.close();
 
-                            
testingLeaderElectionDriver.onFatalError(testException);
+                            
testingLeaderElectionDriver.triggerFatalError(testException);
                             assertThat(testingContender.getError()).isNull();
 
                             assertThat(
@@ -696,62 +737,36 @@ class DefaultLeaderElectionServiceTest {
         };
     }
 
-    /**
-     * Tests that we can shut down the DefaultLeaderElectionService if the 
used LeaderElectionDriver
-     * holds an internal lock. See FLINK-20008 for more details.
-     */
-    @Test
-    void testServiceShutDownWithSynchronizedDriver() throws Exception {
-        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
-                testingLeaderElectionDriverFactory =
-                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
-        final DefaultLeaderElectionService leaderElectionService =
-                new DefaultLeaderElectionService(
-                        testingLeaderElectionDriverFactory,
-                        
fatalErrorHandlerExtension.getTestingFatalErrorHandler());
-        leaderElectionService.startLeaderElectionBackend();
-
-        final LeaderElection leaderElection =
-                
leaderElectionService.createLeaderElection(createRandomContenderID());
-        final TestingContender testingContender = new 
TestingContender(TEST_URL, leaderElection);
-        testingContender.startLeaderElection();
-
-        final TestingLeaderElectionDriver currentLeaderDriver =
-                Preconditions.checkNotNull(
-                        
testingLeaderElectionDriverFactory.getCurrentLeaderDriver());
-
-        currentLeaderDriver.isLeader();
-
-        leaderElection.close();
-        leaderElectionService.close();
-
-        testingContender.throwErrorIfPresent();
-    }
-
     @Test
     void testOnLeadershipChangeDoesNotBlock() throws Exception {
         final CompletableFuture<LeaderInformation> initialLeaderInformation =
                 new CompletableFuture<>();
         final OneShotLatch latch = new OneShotLatch();
 
-        final TestingGenericLeaderElectionDriver driver =
-                TestingGenericLeaderElectionDriver.newBuilder()
-                        .setWriteLeaderInformationConsumer(
-                                leaderInformation -> {
-                                    // the first call saves the confirmed 
LeaderInformation
-                                    if (!initialLeaderInformation.isDone()) {
-                                        
initialLeaderInformation.complete(leaderInformation);
-                                    } else {
-                                        latch.awaitQuietly();
+        final AtomicBoolean leadershipGranted = new AtomicBoolean(false);
+        final TestingLeaderElectionDriver.Builder driverBuilder =
+                TestingLeaderElectionDriver.newBuilder(
+                                leadershipGranted, new AtomicReference<>(), 
new AtomicBoolean())
+                        .setPublishLeaderInformationConsumer(
+                                (lock, contenderID, leaderInformation) -> {
+                                    try {
+                                        lock.lock();
+                                        // the first call saves the confirmed 
LeaderInformation
+                                        if 
(!initialLeaderInformation.isDone()) {
+                                            
initialLeaderInformation.complete(leaderInformation);
+                                        } else {
+                                            latch.awaitQuietly();
+                                        }
+                                    } finally {
+                                        lock.unlock();
                                     }
                                 })
-                        .setHasLeadershipSupplier(() -> true)
-                        .build();
-
+                        .setHasLeadershipFunction(lock -> 
leadershipGranted.get());
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(driverBuilder);
         final DefaultLeaderElectionService testInstance =
                 new DefaultLeaderElectionService(
-                        (leaderElectionEventHandler, errorHandler) -> driver,
-                        
fatalErrorHandlerExtension.getTestingFatalErrorHandler());
+                        driverFactory, 
fatalErrorHandlerExtension.getTestingFatalErrorHandler());
         testInstance.startLeaderElectionBackend();
 
         final String contenderID = "contender-id";
@@ -767,7 +782,8 @@ class DefaultLeaderElectionServiceTest {
 
         // initial messages to initialize usedLeaderSessionID and 
confirmedLeaderInformation
         final UUID sessionID = UUID.randomUUID();
-        testInstance.onGrantLeadership(sessionID);
+        leadershipGranted.set(true);
+        testInstance.isLeader(sessionID);
 
         FlinkAssertions.assertThatFuture(initialLeaderInformation)
                 .eventuallySucceeds()
@@ -791,7 +807,10 @@ class DefaultLeaderElectionServiceTest {
                                 .setGrantLeadershipConsumer(
                                         ignoredSessionID -> 
latch.awaitQuietly())
                                 .build(),
-                TestingLeaderElectionDriver::isLeader);
+                (leadershipGranted, listener) -> {
+                    leadershipGranted.set(true);
+                    listener.isLeader(UUID.randomUUID());
+                });
     }
 
     @Test
@@ -801,35 +820,39 @@ class DefaultLeaderElectionServiceTest {
                         TestingGenericLeaderContender.newBuilder()
                                 
.setRevokeLeadershipRunnable(latch::awaitQuietly)
                                 .build(),
-                driver -> {
-                    driver.isLeader();
+                (leadershipGranted, listener) -> {
+                    leadershipGranted.set(true);
+                    listener.isLeader(UUID.randomUUID());
+
+                    leadershipGranted.set(false);
                     // this call should not block the test execution
-                    driver.notLeader();
+                    listener.notLeader();
                 });
     }
 
     private void testNonBlockingCall(
             Function<OneShotLatch, TestingGenericLeaderContender> 
contenderCreator,
-            Consumer<TestingLeaderElectionDriver> driverAction)
+            BiConsumer<AtomicBoolean, 
MultipleComponentLeaderElectionDriver.Listener>
+                    listenerAction)
             throws Exception {
         final OneShotLatch latch = new OneShotLatch();
         final TestingGenericLeaderContender contender = 
contenderCreator.apply(latch);
 
-        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
-                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
-
+        final AtomicBoolean leadershipGranted = new AtomicBoolean(false);
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(
+                                leadershipGranted, new AtomicReference<>(), 
new AtomicBoolean()));
         final DefaultLeaderElectionService testInstance =
                 new DefaultLeaderElectionService(
                         driverFactory, 
fatalErrorHandlerExtension.getTestingFatalErrorHandler());
         testInstance.startLeaderElectionBackend();
+
         final LeaderElection leaderElection =
                 testInstance.createLeaderElection(createRandomContenderID());
         leaderElection.startLeaderElection(contender);
 
-        final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
-        assertThat(driver).isNotNull();
-
-        driverAction.accept(driver);
+        listenerAction.accept(leadershipGranted, testInstance);
 
         latch.trigger();
 
@@ -843,6 +866,10 @@ class DefaultLeaderElectionServiceTest {
 
     private class Context {
 
+        private final TestingLeaderElectionDriver.Factory driverFactory;
+
+        private final AtomicBoolean leadershipGranted;
+
         final String contenderID = createRandomContenderID();
 
         DefaultLeaderElectionService leaderElectionService;
@@ -852,6 +879,44 @@ class DefaultLeaderElectionServiceTest {
 
         LeaderElection leaderElection;
 
+        private Context() {
+            this(new AtomicBoolean(false), new AtomicReference<>());
+        }
+
+        private Context(AtomicReference<LeaderInformationRegister> 
storedLeaderInformation) {
+            this(new AtomicBoolean(false), storedLeaderInformation);
+        }
+
+        private Context(
+                AtomicBoolean leadershipGranted,
+                AtomicReference<LeaderInformationRegister> 
storedLeaderInformation) {
+            this(
+                    leadershipGranted,
+                    TestingLeaderElectionDriver.newBuilder(
+                            leadershipGranted, storedLeaderInformation, new 
AtomicBoolean()));
+        }
+
+        private Context(
+                AtomicBoolean leadershipGranted,
+                TestingLeaderElectionDriver.Builder driverBuilder) {
+            this.leadershipGranted = leadershipGranted;
+            this.driverFactory = new 
TestingLeaderElectionDriver.Factory(driverBuilder);
+        }
+
+        void grantLeadership() {
+            grantLeadership(UUID.randomUUID());
+        }
+
+        void grantLeadership(UUID leaderSessionID) {
+            leadershipGranted.set(true);
+            leaderElectionService.isLeader(leaderSessionID);
+        }
+
+        void revokeLeadership() {
+            leadershipGranted.set(false);
+            leaderElectionService.notLeader();
+        }
+
         void runTestWithSynchronousEventHandling(RunnableWithException 
testMethod)
                 throws Exception {
             runTest(testMethod, Executors.newDirectExecutorService());
@@ -868,8 +933,6 @@ class DefaultLeaderElectionServiceTest {
         void runTest(RunnableWithException testMethod, ExecutorService 
leaderEventOperationExecutor)
                 throws Exception {
             try {
-                final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
-                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
                 leaderElectionService =
                         new DefaultLeaderElectionService(
                                 driverFactory,
@@ -877,14 +940,12 @@ class DefaultLeaderElectionServiceTest {
                                         .getTestingFatalErrorHandler(),
                                 leaderEventOperationExecutor);
                 leaderElectionService.startLeaderElectionBackend();
+                testingLeaderElectionDriver = 
driverFactory.assertAndGetOnlyCreatedDriver();
 
                 leaderElection = 
leaderElectionService.createLeaderElection(contenderID);
                 testingContender = new TestingContender(TEST_URL, 
leaderElection);
                 testingContender.startLeaderElection();
 
-                testingLeaderElectionDriver = 
driverFactory.getCurrentLeaderDriver();
-
-                assertThat(testingLeaderElectionDriver).isNotNull();
                 testMethod.run();
             } finally {
                 if (leaderElection != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
index 7d7bd947dc6..a55941deed7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -47,11 +48,8 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
 
     @Test
     void isLeaderInformsAllRegisteredLeaderElectionEventHandlers() throws 
Exception {
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
-
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
-                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+                createDefaultMultiplexingLeaderElectionService();
 
         try {
             final Collection<SimpleTestingLeaderElectionEventListener> 
eventListeners =
@@ -66,7 +64,7 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
                 counter++;
             }
 
-            leaderElectionDriver.grantLeadership();
+            leaderElectionService.isLeader(UUID.randomUUID());
 
             for (SimpleTestingLeaderElectionEventListener eventListener : 
eventListeners) {
                 assertThat(eventListener.hasLeadership()).isTrue();
@@ -76,23 +74,27 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
         }
     }
 
+    private DefaultMultipleComponentLeaderElectionService
+            createDefaultMultiplexingLeaderElectionService() throws Exception {
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean()));
+        return createDefaultMultiplexingLeaderElectionService(driverFactory);
+    }
+
     private DefaultMultipleComponentLeaderElectionService
             createDefaultMultiplexingLeaderElectionService(
-                    TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver)
-                    throws Exception {
+                    TestingLeaderElectionDriver.Factory driverFactory) throws 
Exception {
         return new DefaultMultipleComponentLeaderElectionService(
                 fatalErrorHandlerExtension.getTestingFatalErrorHandler(),
-                new 
TestingMultipleComponentLeaderElectionDriverFactory(leaderElectionDriver),
+                driverFactory,
                 Executors.newDirectExecutorService());
     }
 
     @Test
     void notLeaderInformsAllRegisteredLeaderElectionEventHandlers() throws 
Exception {
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
-
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
-                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+                createDefaultMultiplexingLeaderElectionService();
 
         try {
             final Collection<SimpleTestingLeaderElectionEventListener> 
eventListeners =
@@ -107,8 +109,8 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
                 counter++;
             }
 
-            leaderElectionDriver.grantLeadership();
-            leaderElectionDriver.revokeLeadership();
+            leaderElectionService.isLeader(UUID.randomUUID());
+            leaderElectionService.notLeader();
 
             for (SimpleTestingLeaderElectionEventListener eventListener : 
eventListeners) {
                 assertThat(eventListener.hasLeadership()).isFalse();
@@ -120,17 +122,20 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
 
     @Test
     void handleFatalError() throws Exception {
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean()));
 
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
-                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+                createDefaultMultiplexingLeaderElectionService(driverFactory);
+        final TestingLeaderElectionDriver leaderElectionDriver =
+                driverFactory.assertAndGetOnlyCreatedDriver();
 
         try {
             final Throwable expectedFatalError =
                     new Exception("Expected exception simulating a fatal 
error.");
 
-            leaderElectionDriver.triggerErrorHandling(expectedFatalError);
+            leaderElectionDriver.triggerFatalError(expectedFatalError);
 
             FlinkAssertions.assertThatFuture(
                             fatalErrorHandlerExtension
@@ -146,11 +151,12 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
 
     @Test
     void unregisteredEventHandlersAreNotNotified() throws Exception {
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean()));
 
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
-                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+                createDefaultMultiplexingLeaderElectionService(driverFactory);
 
         try {
             final SimpleTestingLeaderElectionEventListener 
leaderElectionEventHandler =
@@ -160,7 +166,7 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
                     componentId, leaderElectionEventHandler);
             
leaderElectionService.unregisterLeaderElectionEventHandler(componentId);
 
-            leaderElectionDriver.grantLeadership();
+            leaderElectionService.isLeader(UUID.randomUUID());
 
             assertThat(leaderElectionEventHandler.hasLeadership()).isFalse();
         } finally {
@@ -170,13 +176,15 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
 
     @Test
     void newlyRegisteredEventHandlersAreInformedAboutLeadership() throws 
Exception {
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean()));
+
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
-                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+                createDefaultMultiplexingLeaderElectionService(driverFactory);
 
         try {
-            leaderElectionDriver.grantLeadership();
+            leaderElectionService.isLeader(UUID.randomUUID());
 
             final SimpleTestingLeaderElectionEventListener 
leaderElectionEventHandler =
                     new SimpleTestingLeaderElectionEventListener();
@@ -191,10 +199,12 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
 
     @Test
     public void testLeaderSessionIdMatchesBetweenComponents() throws Exception 
{
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean()));
+
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
-                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+                createDefaultMultiplexingLeaderElectionService(driverFactory);
 
         try {
             final Component preLeadershipGrantedComponent =
@@ -212,7 +222,7 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
                     preLeadershipGrantedComponent.getComponentId(),
                     
preLeadershipGrantedComponent.getLeaderElectionEventListener());
 
-            leaderElectionDriver.grantLeadership();
+            leaderElectionService.isLeader(UUID.randomUUID());
 
             leaderElectionService.registerLeaderElectionEventHandler(
                     postLeadershipGrantedComponent.getComponentId(),
@@ -236,13 +246,15 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
 
     @Test
     void allKnownLeaderInformationCallsEventHandlers() throws Exception {
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean()));
+
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
-                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+                createDefaultMultiplexingLeaderElectionService(driverFactory);
 
         try {
-            leaderElectionDriver.grantLeadership();
+            leaderElectionService.isLeader(UUID.randomUUID());
 
             final Collection<Component> knownLeaderInformation = 
createComponents(3);
             final Collection<Component> unknownLeaderInformation = 
createComponents(2);
@@ -275,16 +287,18 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
 
     @Test
     void allKnownLeaderInformationDoesNotBlock() throws Exception {
-        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
+        final TestingLeaderElectionDriver.Factory driverFactory =
+                new TestingLeaderElectionDriver.Factory(
+                        TestingLeaderElectionDriver.newBuilder(new 
AtomicBoolean()));
+
         final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
                 new DefaultMultipleComponentLeaderElectionService(
                         
fatalErrorHandlerExtension.getTestingFatalErrorHandler(),
-                        new 
TestingMultipleComponentLeaderElectionDriverFactory(
-                                leaderElectionDriver),
+                        driverFactory,
                         
java.util.concurrent.Executors.newSingleThreadScheduledExecutor());
+
         try {
-            leaderElectionDriver.grantLeadership();
+            leaderElectionService.isLeader(UUID.randomUUID());
 
             final String knownLeaderInformationComponent = 
"knownLeaderInformationComponent";
             final BlockingLeaderElectionEventHandler 
knownLeaderElectionEventHandler =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionEvent.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionEvent.java
index b9a8cd5cd11..cf7ffcb230d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionEvent.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionEvent.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import java.util.UUID;
+
 /** Leader election event. */
 public abstract class LeaderElectionEvent {
     public boolean isIsLeaderEvent() {
@@ -49,6 +51,17 @@ public abstract class LeaderElectionEvent {
     }
 
     public static class IsLeaderEvent extends LeaderElectionEvent {
+
+        private final UUID leaderSessionID;
+
+        public IsLeaderEvent(UUID leaderSessionID) {
+            this.leaderSessionID = leaderSessionID;
+        }
+
+        public UUID getLeaderSessionID() {
+            return leaderSessionID;
+        }
+
         @Override
         public boolean isIsLeaderEvent() {
             return true;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index dcccfe2890f..7516d7659b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -155,7 +155,6 @@ public class LeaderElectionTest {
         LeaderElection createLeaderElection() throws Exception;
     }
 
-    @Deprecated
     private static final class ZooKeeperServiceClass implements ServiceClass {
 
         private TestingServer testingServer;
@@ -181,9 +180,11 @@ public class LeaderElectionTest {
             curatorFrameworkWrapper =
                     ZooKeeperUtils.startCuratorFramework(configuration, 
fatalErrorHandler);
 
-            leaderElectionService =
-                    ZooKeeperUtils.createLeaderElectionService(
+            final MultipleComponentLeaderElectionDriverFactory driverFactory =
+                    new ZooKeeperMultipleComponentLeaderElectionDriverFactory(
                             curatorFrameworkWrapper.asCuratorFramework());
+            leaderElectionService = new 
DefaultLeaderElectionService(driverFactory);
+            leaderElectionService.startLeaderElectionBackend();
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java
deleted file mode 100644
index ba99288968b..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import org.apache.flink.util.function.ThrowingRunnable;
-
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/**
- * {@code TestingGenericLeaderElectionDriver} is test implementation of {@link 
LeaderElectionDriver}
- * to support test cases in the most generic way.
- */
-public class TestingGenericLeaderElectionDriver implements 
LeaderElectionDriver {
-
-    private final Consumer<LeaderInformation> writeLeaderInformationConsumer;
-    private final Supplier<Boolean> hasLeadershipSupplier;
-    private final ThrowingRunnable<Exception> closeRunnable;
-
-    private TestingGenericLeaderElectionDriver(
-            Consumer<LeaderInformation> writeLeaderInformationConsumer,
-            Supplier<Boolean> hasLeadershipSupplier,
-            ThrowingRunnable<Exception> closeRunnable) {
-        this.writeLeaderInformationConsumer = writeLeaderInformationConsumer;
-        this.hasLeadershipSupplier = hasLeadershipSupplier;
-        this.closeRunnable = closeRunnable;
-    }
-
-    @Override
-    public void writeLeaderInformation(LeaderInformation leaderInformation) {
-        writeLeaderInformationConsumer.accept(leaderInformation);
-    }
-
-    @Override
-    public boolean hasLeadership() {
-        return hasLeadershipSupplier.get();
-    }
-
-    @Override
-    public void close() throws Exception {
-        closeRunnable.run();
-    }
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /** {@code Builder} for creating {@code TestingGenericLeaderContender} 
instances. */
-    public static class Builder {
-        private Consumer<LeaderInformation> writeLeaderInformationConsumer =
-                ignoredLeaderInformation -> {};
-        private Supplier<Boolean> hasLeadershipSupplier = () -> false;
-        private ThrowingRunnable<Exception> closeRunnable = () -> {};
-
-        private Builder() {}
-
-        public Builder setWriteLeaderInformationConsumer(
-                Consumer<LeaderInformation> writeLeaderInformationConsumer) {
-            this.writeLeaderInformationConsumer = 
writeLeaderInformationConsumer;
-            return this;
-        }
-
-        public Builder setHasLeadershipSupplier(Supplier<Boolean> 
hasLeadershipSupplier) {
-            this.hasLeadershipSupplier = hasLeadershipSupplier;
-            return this;
-        }
-
-        public Builder setCloseRunnable(ThrowingRunnable<Exception> 
closeRunnable) {
-            this.closeRunnable = closeRunnable;
-            return this;
-        }
-
-        public TestingGenericLeaderElectionDriver build() {
-            return new TestingGenericLeaderElectionDriver(
-                    writeLeaderInformationConsumer, hasLeadershipSupplier, 
closeRunnable);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
index e7b411dd046..ae87846b46b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
@@ -19,147 +19,266 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.TriConsumer;
 
-import javax.annotation.Nullable;
-
-import java.util.UUID;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
 
 /**
- * {@link LeaderElectionDriver} implementation which provides some convenience 
functions for testing
- * purposes. Please use {@link #isLeader} and {@link #notLeader()} to manually 
control the
- * leadership.
+ * {@code TestingLeaderElectionDriver} is a generic test implementation of 
{@link
+ * MultipleComponentLeaderElectionDriver} which can be used in test cases.
  */
-public class TestingLeaderElectionDriver implements LeaderElectionDriver {
-
-    private final Object lock = new Object();
+public class TestingLeaderElectionDriver implements 
MultipleComponentLeaderElectionDriver {
 
-    private final AtomicBoolean isLeader = new AtomicBoolean(false);
-    private final LeaderElectionEventHandler leaderElectionEventHandler;
-    private final FatalErrorHandler fatalErrorHandler;
+    private final Function<ReentrantLock, Boolean> hasLeadershipFunction;
+    private final TriConsumer<ReentrantLock, String, LeaderInformation>
+            publishLeaderInformationConsumer;
+    private final BiConsumer<ReentrantLock, String> 
deleteLeaderInformationConsumer;
 
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable;
-    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable;
+    private final ThrowingConsumer<ReentrantLock, Exception> closeConsumer;
 
-    private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+    private final ReentrantLock lock = new ReentrantLock();
 
-    // Leader information on external storage
-    private LeaderInformation leaderInformation = LeaderInformation.empty();
+    private final FatalErrorHandler fatalErrorHandler;
 
-    private TestingLeaderElectionDriver(
-            LeaderElectionEventHandler leaderElectionEventHandler,
+    public TestingLeaderElectionDriver(
             FatalErrorHandler fatalErrorHandler,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable,
-            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable,
-            Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
-        this.leaderElectionEventHandler = leaderElectionEventHandler;
+            Function<ReentrantLock, Boolean> hasLeadershipFunction,
+            TriConsumer<ReentrantLock, String, LeaderInformation> 
publishLeaderInformationConsumer,
+            BiConsumer<ReentrantLock, String> deleteLeaderInformationConsumer,
+            ThrowingConsumer<ReentrantLock, Exception> closeConsumer) {
         this.fatalErrorHandler = fatalErrorHandler;
-        this.closeRunnable = closeRunnable;
-        this.beforeLockCloseRunnable = beforeLockCloseRunnable;
-        this.beforeGrantRunnable = beforeGrantRunnable;
+
+        this.hasLeadershipFunction = hasLeadershipFunction;
+        this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
+        this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
+        this.closeConsumer = closeConsumer;
     }
 
     @Override
-    public void writeLeaderInformation(LeaderInformation leaderInformation) {
-        this.leaderInformation = leaderInformation;
+    public boolean hasLeadership() {
+        return hasLeadershipFunction.apply(lock);
     }
 
     @Override
-    public boolean hasLeadership() {
-        return isLeader.get();
+    public void publishLeaderInformation(String contenderID, LeaderInformation 
leaderInformation) {
+        publishLeaderInformationConsumer.accept(lock, contenderID, 
leaderInformation);
+    }
+
+    @Override
+    public void deleteLeaderInformation(String contenderID) {
+        deleteLeaderInformationConsumer.accept(lock, contenderID);
     }
 
     @Override
     public void close() throws Exception {
-        beforeLockCloseRunnable.accept(leaderElectionEventHandler);
-        synchronized (lock) {
-            closeRunnable.accept(leaderElectionEventHandler);
-        }
+        closeConsumer.accept(lock);
     }
 
-    public LeaderInformation getLeaderInformation() {
-        return leaderInformation;
+    public void triggerFatalError(Throwable t) {
+        fatalErrorHandler.onFatalError(t);
     }
 
-    public void isLeader(UUID newSessionID) {
-        synchronized (lock) {
-            isLeader.set(true);
-            beforeGrantRunnable.accept(leaderElectionEventHandler);
-            leaderElectionEventHandler.onGrantLeadership(newSessionID);
-        }
+    public static Builder newNoOpBuilder() {
+        return new Builder();
     }
 
-    public void isLeader() {
-        isLeader(UUID.randomUUID());
+    public ReentrantLock getLock() {
+        return lock;
     }
 
-    public void notLeader() {
-        synchronized (lock) {
-            isLeader.set(false);
-            leaderElectionEventHandler.onRevokeLeadership();
-        }
+    public static Builder newBuilder(AtomicBoolean grantLeadership) {
+        return newBuilder(grantLeadership, new AtomicReference<>(), new 
AtomicBoolean());
     }
 
-    public void leaderInformationChanged(LeaderInformation newLeader) {
-        leaderInformation = newLeader;
-        leaderElectionEventHandler.onLeaderInformationChange(newLeader);
+    /**
+     * Returns a {@code Builder} that comes with a basic default 
implementation of the {@link
+     * MultipleComponentLeaderElectionDriver} contract using the passed 
parameters for information
+     * storage.
+     *
+     * @param hasLeadership saves the current leadership state of the instance 
that is created from
+     *     the {@code Builder}.
+     * @param storedLeaderInformation saves the leader information that would 
be otherwise stored in
+     *     some external storage.
+     * @param isClosed saves the running state of the driver.
+     */
+    public static Builder newBuilder(
+            AtomicBoolean hasLeadership,
+            AtomicReference<LeaderInformationRegister> storedLeaderInformation,
+            AtomicBoolean isClosed) {
+        Preconditions.checkState(
+                storedLeaderInformation.get() == null
+                        || !storedLeaderInformation
+                                .get()
+                                .getRegisteredContenderIDs()
+                                .iterator()
+                                .hasNext(),
+                "Initial state check for storedLeaderInformation failed.");
+        Preconditions.checkState(!isClosed.get(), "Initial state check for 
isClosed failed.");
+        return newNoOpBuilder()
+                .setHasLeadershipFunction(
+                        lock -> {
+                            try {
+                                lock.lock();
+                                return hasLeadership.get();
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setPublishLeaderInformationConsumer(
+                        (lock, contenderID, leaderInformation) -> {
+                            try {
+                                lock.lock();
+                                if (hasLeadership.get()) {
+                                    storedLeaderInformation.getAndUpdate(
+                                            oldData ->
+                                                    
LeaderInformationRegister.merge(
+                                                            oldData,
+                                                            contenderID,
+                                                            
leaderInformation));
+                                }
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setDeleteLeaderInformationConsumer(
+                        (lock, contenderID) -> {
+                            try {
+                                lock.lock();
+                                if (hasLeadership.get()) {
+                                    storedLeaderInformation.getAndUpdate(
+                                            oldData ->
+                                                    
LeaderInformationRegister.clear(
+                                                            oldData, 
contenderID));
+                                }
+                            } finally {
+                                lock.unlock();
+                            }
+                        })
+                .setCloseConsumer(
+                        lock -> {
+                            try {
+                                lock.lock();
+                                isClosed.set(true);
+                            } finally {
+                                lock.unlock();
+                            }
+                        });
     }
 
-    public void onFatalError(Throwable throwable) {
-        fatalErrorHandler.onFatalError(throwable);
+    /**
+     * {@code Factory} implements {@link 
MultipleComponentLeaderElectionDriverFactory} for the
+     * {@code TestingLeaderElectionDriver}.
+     */
+    public static class Factory implements 
MultipleComponentLeaderElectionDriverFactory {
+
+        private final Builder driverBuilder;
+
+        private final Queue<TestingLeaderElectionDriver> createdDrivers =
+                new ConcurrentLinkedQueue<>();
+
+        public static Factory createFactoryWithNoOpDriver() {
+            return new Factory(TestingLeaderElectionDriver.newNoOpBuilder());
+        }
+
+        public static Factory defaultDriverFactory(
+                AtomicBoolean hasLeadership,
+                AtomicReference<LeaderInformationRegister> 
storedLeaderInformation,
+                AtomicBoolean isClosed) {
+            return new Factory(
+                    TestingLeaderElectionDriver.newBuilder(
+                            hasLeadership, storedLeaderInformation, isClosed));
+        }
+
+        public Factory(Builder driverBuilder) {
+            this.driverBuilder = driverBuilder;
+        }
+
+        @Override
+        public MultipleComponentLeaderElectionDriver create(
+                Listener leaderElectionListener, FatalErrorHandler 
fatalErrorHandler)
+                throws Exception {
+            final TestingLeaderElectionDriver driver =
+                    driverBuilder.build(leaderElectionListener, 
fatalErrorHandler);
+            createdDrivers.add(driver);
+
+            return driver;
+        }
+
+        /**
+         * Returns the {@link TestingLeaderElectionDriver} instance that was 
created by this {@code
+         * Factory} and verifies that no other driver was created.
+         *
+         * @return The only {@code MultipleComponentLeaderElectionDriver} that 
was created by this
+         *     {@code Factory}.
+         * @throws AssertionError if no {@code 
MultipleComponentLeaderElectionDriver} or more than
+         *     one instance was created by this {@code Factory}.
+         */
+        public TestingLeaderElectionDriver assertAndGetOnlyCreatedDriver() {
+            final TestingLeaderElectionDriver driver = createdDrivers.poll();
+            if (driver == null) {
+                throw new AssertionError("No driver was created by this 
factory, yet.");
+            } else if (!createdDrivers.isEmpty()) {
+                throw new AssertionError("More than one driver was created by 
this factory.");
+            }
+
+            return driver;
+        }
     }
 
-    /** Factory for create {@link TestingLeaderElectionDriver}. */
-    public static class TestingLeaderElectionDriverFactory implements 
LeaderElectionDriverFactory {
+    /** {@link Builder} for creating {@link TestingLeaderElectionDriver} 
instances. */
+    public static class Builder {
 
-        private TestingLeaderElectionDriver currentLeaderDriver;
+        private Function<ReentrantLock, Boolean> hasLeadershipFunction = 
ignoredLock -> false;
+        private TriConsumer<ReentrantLock, String, LeaderInformation>
+                publishLeaderInformationConsumer =
+                        (ignoredLock, ignoredContenderID, 
ignoredLeaderInformation) -> {};
+        private BiConsumer<ReentrantLock, String> 
deleteLeaderInformationConsumer =
+                (ignoredLock, ignoredContenderID) -> {};
 
-        private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable;
-        private final ThrowingConsumer<LeaderElectionEventHandler, Exception>
-                beforeLockCloseRunnable;
+        private ThrowingConsumer<ReentrantLock, Exception> closeConsumer = 
(ignoredLock) -> {};
 
-        private final Consumer<LeaderElectionEventHandler> beforeGrantRunnable;
+        private Builder() {}
 
-        public TestingLeaderElectionDriverFactory() {
-            this(ignoredLeaderElectionEventHandler -> {});
+        public Builder setHasLeadershipFunction(
+                Function<ReentrantLock, Boolean> hasLeadershipFunction) {
+            this.hasLeadershipFunction = hasLeadershipFunction;
+            return this;
         }
 
-        public TestingLeaderElectionDriverFactory(
-                ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable) {
-            this(
-                    closeRunnable,
-                    ignoredLeaderElectionEventHandler -> {},
-                    ignoredLeaderElectionEventHandler -> {});
+        public Builder setPublishLeaderInformationConsumer(
+                TriConsumer<ReentrantLock, String, LeaderInformation>
+                        publishLeaderInformationConsumer) {
+            this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
+            return this;
         }
 
-        public TestingLeaderElectionDriverFactory(
-                ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable,
-                ThrowingConsumer<LeaderElectionEventHandler, Exception> 
beforeLockCloseRunnable,
-                Consumer<LeaderElectionEventHandler> beforeGrantRunnable) {
-            this.closeRunnable = closeRunnable;
-            this.beforeLockCloseRunnable = beforeLockCloseRunnable;
-            this.beforeGrantRunnable = beforeGrantRunnable;
+        public Builder setDeleteLeaderInformationConsumer(
+                BiConsumer<ReentrantLock, String> 
deleteLeaderInformationConsumer) {
+            this.deleteLeaderInformationConsumer = 
deleteLeaderInformationConsumer;
+            return this;
         }
 
-        @Override
-        public LeaderElectionDriver createLeaderElectionDriver(
-                LeaderElectionEventHandler leaderEventHandler,
-                FatalErrorHandler fatalErrorHandler) {
-            currentLeaderDriver =
-                    new TestingLeaderElectionDriver(
-                            leaderEventHandler,
-                            fatalErrorHandler,
-                            closeRunnable,
-                            beforeLockCloseRunnable,
-                            beforeGrantRunnable);
-            return currentLeaderDriver;
+        public Builder setCloseConsumer(ThrowingConsumer<ReentrantLock, 
Exception> closeConsumer) {
+            this.closeConsumer = closeConsumer;
+            return this;
         }
 
-        @Nullable
-        public TestingLeaderElectionDriver getCurrentLeaderDriver() {
-            return currentLeaderDriver;
+        public TestingLeaderElectionDriver build(
+                Listener ignoredListener, FatalErrorHandler fatalErrorHandler) 
{
+            return new TestingLeaderElectionDriver(
+                    fatalErrorHandler,
+                    hasLeadershipFunction,
+                    publishLeaderInformationConsumer,
+                    deleteLeaderInformationConsumer,
+                    closeConsumer);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
deleted file mode 100644
index 5025e02de4a..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.UUID;
-import java.util.function.Consumer;
-
-/**
- * {@link LeaderElectionEventHandler} implementation which provides some 
convenience functions for
- * testing purposes.
- */
-public class TestingLeaderElectionEventHandler extends TestingLeaderBase
-        implements LeaderElectionEventHandler, AutoCloseable {
-
-    private final Object lock = new Object();
-
-    private final String leaderAddress;
-
-    private final OneShotLatch initializationLatch;
-
-    private final Consumer<LeaderInformation> leaderInformationConsumer;
-
-    @Nullable private LeaderElectionDriver initializedLeaderElectionDriver = 
null;
-
-    private LeaderInformation confirmedLeaderInformation = 
LeaderInformation.empty();
-
-    private boolean running = true;
-
-    public TestingLeaderElectionEventHandler(String leaderAddress) {
-        this.leaderAddress = leaderAddress;
-        this.initializationLatch = new OneShotLatch();
-        this.leaderInformationConsumer = (ignore) -> {};
-    }
-
-    public TestingLeaderElectionEventHandler(
-            String leaderAddress, Consumer<LeaderInformation> 
leaderInformationConsumer) {
-        this.leaderAddress = leaderAddress;
-        this.initializationLatch = new OneShotLatch();
-        this.leaderInformationConsumer = leaderInformationConsumer;
-    }
-
-    public void init(LeaderElectionDriver leaderElectionDriver) {
-        Preconditions.checkState(initializedLeaderElectionDriver == null);
-        this.initializedLeaderElectionDriver = leaderElectionDriver;
-        initializationLatch.trigger();
-    }
-
-    private void ifRunning(Runnable action) {
-        synchronized (lock) {
-            if (running) {
-                action.run();
-            }
-        }
-    }
-
-    @Override
-    public void onGrantLeadership(UUID newLeaderSessionId) {
-        ifRunning(
-                () ->
-                        waitForInitialization(
-                                leaderElectionDriver -> {
-                                    confirmedLeaderInformation =
-                                            LeaderInformation.known(
-                                                    newLeaderSessionId, 
leaderAddress);
-                                    
leaderElectionDriver.writeLeaderInformation(
-                                            confirmedLeaderInformation);
-                                    
leaderEventQueue.offer(confirmedLeaderInformation);
-                                }));
-    }
-
-    @Override
-    public void onRevokeLeadership() {
-        ifRunning(
-                () ->
-                        waitForInitialization(
-                                (leaderElectionDriver) -> {
-                                    confirmedLeaderInformation = 
LeaderInformation.empty();
-                                    
leaderElectionDriver.writeLeaderInformation(
-                                            confirmedLeaderInformation);
-                                    
leaderEventQueue.offer(confirmedLeaderInformation);
-                                }));
-    }
-
-    @Override
-    public void onLeaderInformationChange(LeaderInformation leaderInformation) 
{
-        ifRunning(
-                () ->
-                        waitForInitialization(
-                                leaderElectionDriver -> {
-                                    
leaderInformationConsumer.accept(leaderInformation);
-                                    if 
(confirmedLeaderInformation.getLeaderSessionID() != null
-                                            && 
!this.confirmedLeaderInformation.equals(
-                                                    leaderInformation)) {
-                                        
leaderElectionDriver.writeLeaderInformation(
-                                                confirmedLeaderInformation);
-                                    }
-                                }));
-    }
-
-    private void waitForInitialization(Consumer<? super LeaderElectionDriver> 
operation) {
-        try {
-            initializationLatch.await();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-
-        Preconditions.checkState(initializedLeaderElectionDriver != null);
-        operation.accept(initializedLeaderElectionDriver);
-    }
-
-    public LeaderInformation getConfirmedLeaderInformation() {
-        synchronized (lock) {
-            return confirmedLeaderInformation;
-        }
-    }
-
-    @Override
-    public void close() {
-        synchronized (lock) {
-            running = false;
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionListener.java
index a82e3ffb7dc..873c5c947c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionListener.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionListener.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.time.Duration;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -37,8 +38,8 @@ public final class TestingLeaderElectionListener
             new ArrayBlockingQueue<>(10);
 
     @Override
-    public void isLeader() {
-        put(new LeaderElectionEvent.IsLeaderEvent());
+    public void isLeader(UUID leaderSessionID) {
+        put(new LeaderElectionEvent.IsLeaderEvent(leaderSessionID));
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java
deleted file mode 100644
index 6462f34e23c..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Optional;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-
-/** Testing implementation of {@link MultipleComponentLeaderElectionDriver}. */
-public class TestingMultipleComponentLeaderElectionDriver
-        implements MultipleComponentLeaderElectionDriver {
-
-    private final BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer;
-    private final Consumer<String> deleteLeaderInformationConsumer;
-    private boolean hasLeadership;
-
-    private Optional<Listener> listener;
-    private Optional<FatalErrorHandler> fatalErrorHandler;
-
-    private TestingMultipleComponentLeaderElectionDriver(
-            BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer,
-            Consumer<String> deleteLeaderInformationConsumer) {
-        this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
-        this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
-        hasLeadership = false;
-        listener = Optional.empty();
-        fatalErrorHandler = Optional.empty();
-    }
-
-    public void grantLeadership() {
-        if (!hasLeadership) {
-            hasLeadership = true;
-            listener.ifPresent(Listener::isLeader);
-        }
-    }
-
-    public void revokeLeadership() {
-        if (hasLeadership) {
-            hasLeadership = false;
-            listener.ifPresent(Listener::notLeader);
-        }
-    }
-
-    public void triggerErrorHandling(Throwable throwable) {
-        fatalErrorHandler.ifPresent(handler -> 
handler.onFatalError(throwable));
-    }
-
-    public void initializeDriver(Listener listener, FatalErrorHandler 
fatalErrorHandler) {
-        Preconditions.checkState(!this.listener.isPresent(), "Can only set a 
single listener.");
-        this.listener = Optional.of(listener);
-        this.fatalErrorHandler = Optional.of(fatalErrorHandler);
-    }
-
-    @Override
-    public void close() throws Exception {}
-
-    @Override
-    public boolean hasLeadership() {
-        return hasLeadership;
-    }
-
-    @Override
-    public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation) {
-        publishLeaderInformationConsumer.accept(componentId, 
leaderInformation);
-    }
-
-    @Override
-    public void deleteLeaderInformation(String componentId) {
-        deleteLeaderInformationConsumer.accept(componentId);
-    }
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    public static final class Builder {
-        private BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer =
-                (ignoredA, ignoredB) -> {};
-        private Consumer<String> deleteLeaderInformationConsumer = ignored -> 
{};
-
-        public Builder setPublishLeaderInformationConsumer(
-                BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer) {
-            this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
-            return this;
-        }
-
-        public Builder setDeleteLeaderInformationConsumer(
-                Consumer<String> deleteLeaderInformationConsumer) {
-            this.deleteLeaderInformationConsumer = 
deleteLeaderInformationConsumer;
-            return this;
-        }
-
-        public TestingMultipleComponentLeaderElectionDriver build() {
-            return new TestingMultipleComponentLeaderElectionDriver(
-                    publishLeaderInformationConsumer, 
deleteLeaderInformationConsumer);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java
deleted file mode 100644
index bb27d9dd29e..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-
-/**
- * Testing implementation of {@link 
MultipleComponentLeaderElectionDriverFactory} that returns a
- * given {@link MultipleComponentLeaderElectionDriver}.
- */
-public class TestingMultipleComponentLeaderElectionDriverFactory
-        implements MultipleComponentLeaderElectionDriverFactory {
-
-    final TestingMultipleComponentLeaderElectionDriver 
testingMultipleComponentLeaderElectionDriver;
-
-    public TestingMultipleComponentLeaderElectionDriverFactory(
-            TestingMultipleComponentLeaderElectionDriver
-                    testingMultipleComponentLeaderElectionDriver) {
-        this.testingMultipleComponentLeaderElectionDriver =
-                testingMultipleComponentLeaderElectionDriver;
-    }
-
-    @Override
-    public MultipleComponentLeaderElectionDriver create(
-            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
-            FatalErrorHandler fatalErrorHandler)
-            throws Exception {
-        testingMultipleComponentLeaderElectionDriver.initializeDriver(
-                leaderElectionListener, fatalErrorHandler);
-
-        return testingMultipleComponentLeaderElectionDriver;
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
index 35f6deac0a1..4c9726bcb26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
@@ -41,12 +41,11 @@ import java.util.UUID;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Test behaviors of {@link ZooKeeperLeaderElectionDriver} when losing the 
connection to ZooKeeper.
+ * Test behaviors of {@link ZooKeeperMultipleComponentLeaderElectionDriver} 
when losing the
+ * connection to ZooKeeper.
  */
 class ZooKeeperLeaderElectionConnectionHandlingTest {
 
-    private static final String PATH = "/path";
-
     @RegisterExtension
     private static final EachCallbackWrapper<ZooKeeperExtension> 
zooKeeperResource =
             new EachCallbackWrapper<>(new ZooKeeperExtension());
@@ -135,8 +134,8 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
                         configuration,
                         
testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
         CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework();
-        LeaderElectionDriverFactory leaderElectionDriverFactory =
-                new ZooKeeperLeaderElectionDriverFactory(client, PATH);
+        MultipleComponentLeaderElectionDriverFactory 
leaderElectionDriverFactory =
+                new 
ZooKeeperMultipleComponentLeaderElectionDriverFactory(client);
         DefaultLeaderElectionService leaderElectionService =
                 new DefaultLeaderElectionService(
                         leaderElectionDriverFactory,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 013530f81c9..cb69d177eb6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -78,7 +78,7 @@ import static org.mockito.Mockito.when;
  * org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver}. 
To directly test the
  * {@link ZooKeeperMultipleComponentLeaderElectionDriver} and {@link
  * org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver}, 
some simple tests will
- * use {@link TestingLeaderElectionEventHandler} which will not write the 
leader information to
+ * use {@link TestingLeaderElectionListener} which will not write the leader 
information to
  * ZooKeeper. For the complicated tests(e.g. multiple leaders), we will use 
{@link
  * DefaultLeaderElectionService} with {@link TestingContender}.
  */
@@ -165,17 +165,20 @@ class ZooKeeperLeaderElectionTest {
 
         try {
             leaderRetrievalService =
-                    
ZooKeeperUtils.createLeaderRetrievalService(createZooKeeperClient());
+                    ZooKeeperUtils.createLeaderRetrievalService(
+                            createZooKeeperClient(), CONTENDER_ID, new 
Configuration());
 
             LOG.debug("Start leader retrieval service for the 
TestingListener.");
 
             leaderRetrievalService.start(listener);
 
             for (int i = 0; i < num; i++) {
-                leaderElectionService[i] =
-                        
ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient());
-                leaderElections[i] =
-                        
leaderElectionService[i].createLeaderElection("random-contender-id");
+                final MultipleComponentLeaderElectionDriverFactory 
driverFactory =
+                        new 
ZooKeeperMultipleComponentLeaderElectionDriverFactory(
+                                createZooKeeperClient());
+                leaderElectionService[i] = new 
DefaultLeaderElectionService(driverFactory);
+                leaderElectionService[i].startLeaderElectionBackend();
+                leaderElections[i] = 
leaderElectionService[i].createLeaderElection(CONTENDER_ID);
                 contenders[i] = new TestingContender(createAddress(i), 
leaderElections[i]);
 
                 LOG.debug("Start leader election service for contender #{}.", 
i);
@@ -266,13 +269,17 @@ class ZooKeeperLeaderElectionTest {
 
         try {
             leaderRetrievalService =
-                    
ZooKeeperUtils.createLeaderRetrievalService(createZooKeeperClient());
+                    ZooKeeperUtils.createLeaderRetrievalService(
+                            createZooKeeperClient(), CONTENDER_ID, new 
Configuration());
 
             leaderRetrievalService.start(listener);
 
             for (int i = 0; i < num; i++) {
-                leaderElectionService[i] =
-                        
ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient());
+                final MultipleComponentLeaderElectionDriverFactory 
driverFactory =
+                        new 
ZooKeeperMultipleComponentLeaderElectionDriverFactory(
+                                createZooKeeperClient());
+                leaderElectionService[i] = new 
DefaultLeaderElectionService(driverFactory);
+                leaderElectionService[i].startLeaderElectionBackend();
                 leaderElections[i] = 
leaderElectionService[i].createLeaderElection(CONTENDER_ID);
                 contenders[i] =
                         new TestingContender(LEADER_ADDRESS + "_" + i + "_0", 
leaderElections[i]);
@@ -304,8 +311,11 @@ class ZooKeeperLeaderElectionTest {
                     leaderElections[index] = null;
 
                     // create new leader election service which takes part in 
the leader election
-                    leaderElectionService[index] =
-                            
ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient());
+                    final MultipleComponentLeaderElectionDriverFactory 
driverFactory =
+                            new 
ZooKeeperMultipleComponentLeaderElectionDriverFactory(
+                                    createZooKeeperClient());
+                    leaderElectionService[index] = new 
DefaultLeaderElectionService(driverFactory);
+                    leaderElectionService[index].startLeaderElectionBackend();
                     leaderElections[index] =
                             
leaderElectionService[index].createLeaderElection(CONTENDER_ID);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
index 66b4918259d..3747bdbfb47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
@@ -392,7 +392,7 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest {
         }
 
         @Override
-        public void isLeader() {
+        public void isLeader(UUID ignoredSessionID) {
             leadershipFuture.complete(null);
         }
 


Reply via email to