This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a410899e2c [FLINK-32180][runtime] Moves error handling from 
DefaultMultipleComponentLeaderElectionService into the 
MultipleComponentLeaderElectionDriver implementations (#22656)
6a410899e2c is described below

commit 6a410899e2c57059f9944e8dd35742efa135838e
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Tue Jun 20 08:14:17 2023 +0200

    [FLINK-32180][runtime] Moves error handling from 
DefaultMultipleComponentLeaderElectionService into the 
MultipleComponentLeaderElectionDriver implementations (#22656)
    
    The error handling needs to be moved into the driver to allow errors that 
were reported to the
    k8s watcher to be forwarded to the LeaderContender's error handling.
    
    Signed-off-by: Matthias Pohl <matthias.p...@aiven.io>
---
 ...netesMultipleComponentLeaderElectionDriver.java | 20 ++++++-----
 ...ltipleComponentLeaderElectionDriverFactory.java |  9 ++---
 ...sMultipleComponentLeaderElectionHaServices.java |  3 +-
 ...aultMultipleComponentLeaderElectionService.java | 20 ++---------
 .../MultipleComponentLeaderElectionDriver.java     |  7 ++--
 ...ltipleComponentLeaderElectionDriverFactory.java |  7 +++-
 ...eeperMultipleComponentLeaderElectionDriver.java | 34 ++++++++++++------
 ...ltipleComponentLeaderElectionDriverFactory.java |  6 ++--
 ...MultipleComponentLeaderElectionServiceTest.java | 27 +++++++++++++++
 ...stingMultipleComponentLeaderElectionDriver.java | 40 ++++++++++++----------
 ...ltipleComponentLeaderElectionDriverFactory.java |  8 +++--
 ...rMultipleComponentLeaderElectionDriverTest.java | 29 ++++++++++++----
 .../ZooKeeperLeaderRetrievalTest.java              |  3 +-
 13 files changed, 133 insertions(+), 80 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
index 99af8b573c5..98deaa8cbfa 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -137,15 +138,18 @@ public class 
KubernetesMultipleComponentLeaderElectionDriver
     }
 
     @Override
-    public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation)
-            throws Exception {
+    public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation) {
         Preconditions.checkState(running.get());
 
-        kubeClient
-                .checkAndUpdateConfigMap(
-                        configMapName,
-                        updateConfigMapWithLeaderInformation(componentId, 
leaderInformation))
-                .get();
+        try {
+            kubeClient
+                    .checkAndUpdateConfigMap(
+                            configMapName,
+                            updateConfigMapWithLeaderInformation(componentId, 
leaderInformation))
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
+            fatalErrorHandler.onFatalError(e);
+        }
 
         LOG.debug(
                 "Successfully wrote leader information {} for leader {} into 
the config map {}.",
@@ -155,7 +159,7 @@ public class KubernetesMultipleComponentLeaderElectionDriver
     }
 
     @Override
-    public void deleteLeaderInformation(String componentId) throws Exception {
+    public void deleteLeaderInformation(String componentId) {
         publishLeaderInformation(componentId, LeaderInformation.empty());
     }
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java
index 7dd10e8ada3..8bc21949663 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java
@@ -39,25 +39,22 @@ public class 
KubernetesMultipleComponentLeaderElectionDriverFactory
 
     private final Executor watchExecutor;
 
-    private final FatalErrorHandler fatalErrorHandler;
-
     public KubernetesMultipleComponentLeaderElectionDriverFactory(
             FlinkKubeClient kubeClient,
             KubernetesLeaderElectionConfiguration 
kubernetesLeaderElectionConfiguration,
             KubernetesConfigMapSharedWatcher configMapSharedWatcher,
-            Executor watchExecutor,
-            FatalErrorHandler fatalErrorHandler) {
+            Executor watchExecutor) {
         this.kubeClient = Preconditions.checkNotNull(kubeClient);
         this.kubernetesLeaderElectionConfiguration =
                 
Preconditions.checkNotNull(kubernetesLeaderElectionConfiguration);
         this.configMapSharedWatcher = 
Preconditions.checkNotNull(configMapSharedWatcher);
         this.watchExecutor = Preconditions.checkNotNull(watchExecutor);
-        this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
     }
 
     @Override
     public KubernetesMultipleComponentLeaderElectionDriver create(
-            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener)
+            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
+            FatalErrorHandler fatalErrorHandler)
             throws Exception {
         return new KubernetesMultipleComponentLeaderElectionDriver(
                 kubernetesLeaderElectionConfiguration,
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
index 8438d238509..9092bf1eac5 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
@@ -128,8 +128,7 @@ public class 
KubernetesMultipleComponentLeaderElectionHaServices extends Abstrac
                                             kubeClient,
                                             leaderElectionConfiguration,
                                             configMapSharedWatcher,
-                                            watchExecutorService,
-                                            fatalErrorHandler));
+                                            watchExecutorService));
                 } catch (Exception e) {
                     throw new FlinkRuntimeException(
                             "Could not initialize the default single leader 
election service.", e);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
index 4c1a68d7b17..148694e8e78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
@@ -53,8 +52,6 @@ public class DefaultMultipleComponentLeaderElectionService
 
     private final MultipleComponentLeaderElectionDriver 
multipleComponentLeaderElectionDriver;
 
-    private final FatalErrorHandler fatalErrorHandler;
-
     @GuardedBy("lock")
     private final ExecutorService leadershipOperationExecutor;
 
@@ -75,14 +72,12 @@ public class DefaultMultipleComponentLeaderElectionService
                     multipleComponentLeaderElectionDriverFactory,
             ExecutorService leadershipOperationExecutor)
             throws Exception {
-        this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
-
         this.leadershipOperationExecutor = 
Preconditions.checkNotNull(leadershipOperationExecutor);
 
         leaderElectionEventHandlers = new HashMap<>();
 
         multipleComponentLeaderElectionDriver =
-                multipleComponentLeaderElectionDriverFactory.create(this);
+                multipleComponentLeaderElectionDriverFactory.create(this, 
fatalErrorHandler);
     }
 
     public DefaultMultipleComponentLeaderElectionService(
@@ -120,17 +115,8 @@ public class DefaultMultipleComponentLeaderElectionService
 
     @Override
     public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation) {
-        try {
-            multipleComponentLeaderElectionDriver.publishLeaderInformation(
-                    componentId, leaderInformation);
-        } catch (Exception e) {
-            fatalErrorHandler.onFatalError(
-                    new FlinkException(
-                            String.format(
-                                    "Could not write leader information %s for 
leader %s.",
-                                    leaderInformation, componentId),
-                            e));
-        }
+        multipleComponentLeaderElectionDriver.publishLeaderInformation(
+                componentId, leaderInformation);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
index b48686debcd..b4c99ca428b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java
@@ -42,18 +42,15 @@ public interface MultipleComponentLeaderElectionDriver {
      *
      * @param componentId identifying the component for which to publish the 
leader information
      * @param leaderInformation leader information of the respective component
-     * @throws Exception if publishing fails
      */
-    void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation)
-            throws Exception;
+    void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation);
 
     /**
      * Deletes the leader information for the given component.
      *
      * @param componentId identifying the component for which to delete the 
leader information
-     * @throws Exception if deleting fails
      */
-    void deleteLeaderInformation(String componentId) throws Exception;
+    void deleteLeaderInformation(String componentId);
 
     /**
      * Listener interface for state changes of the {@link 
MultipleComponentLeaderElectionDriver}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java
index c02ccbb7e26..547c3498189 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+
 /** Factory for {@link MultipleComponentLeaderElectionDriver}. */
 public interface MultipleComponentLeaderElectionDriverFactory {
 
@@ -27,9 +29,12 @@ public interface 
MultipleComponentLeaderElectionDriverFactory {
      *
      * @param leaderElectionListener listener for the callbacks of the {@link
      *     MultipleComponentLeaderElectionDriver}
+     * @param fatalErrorHandler component for handling fatal errors.
      * @return created {@link MultipleComponentLeaderElectionDriver} instance
      * @throws Exception if the creation fails
      */
     MultipleComponentLeaderElectionDriver create(
-            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener) throws Exception;
+            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
+            FatalErrorHandler fatalErrorHandler)
+            throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
index 97f885844f9..c028f29ec95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -48,6 +49,8 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver
 
     private final MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener;
 
+    private final FatalErrorHandler fatalErrorHandler;
+
     private final LeaderLatch leaderLatch;
 
     private final TreeCache treeCache;
@@ -59,10 +62,12 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver
 
     public ZooKeeperMultipleComponentLeaderElectionDriver(
             CuratorFramework curatorFramework,
-            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener)
+            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
+            FatalErrorHandler fatalErrorHandler)
             throws Exception {
         this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
         this.leaderElectionListener = 
Preconditions.checkNotNull(leaderElectionListener);
+        this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
 
         this.leaderLatch = new LeaderLatch(curatorFramework, 
ZooKeeperUtils.getLeaderLatchPath());
         this.treeCache =
@@ -130,8 +135,7 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver
     }
 
     @Override
-    public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation)
-            throws Exception {
+    public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation) {
         Preconditions.checkState(running.get());
 
         if (LOG.isDebugEnabled()) {
@@ -145,17 +149,25 @@ public class 
ZooKeeperMultipleComponentLeaderElectionDriver
         final String connectionInformationPath =
                 ZooKeeperUtils.generateConnectionInformationPath(componentId);
 
-        ZooKeeperUtils.writeLeaderInformationToZooKeeper(
-                leaderInformation,
-                curatorFramework,
-                leaderLatch::hasLeadership,
-                connectionInformationPath);
+        try {
+            ZooKeeperUtils.writeLeaderInformationToZooKeeper(
+                    leaderInformation,
+                    curatorFramework,
+                    leaderLatch::hasLeadership,
+                    connectionInformationPath);
+        } catch (Exception e) {
+            fatalErrorHandler.onFatalError(e);
+        }
     }
 
     @Override
-    public void deleteLeaderInformation(String leaderName) throws Exception {
-        ZooKeeperUtils.deleteZNode(
-                curatorFramework, 
ZooKeeperUtils.generateZookeeperPath(leaderName));
+    public void deleteLeaderInformation(String leaderName) {
+        try {
+            ZooKeeperUtils.deleteZNode(
+                    curatorFramework, 
ZooKeeperUtils.generateZookeeperPath(leaderName));
+        } catch (Exception e) {
+            fatalErrorHandler.onFatalError(e);
+        }
     }
 
     private void handleStateChange(ConnectionState newState) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java
index ed2dffdad4c..e02cc0d9fb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
@@ -35,9 +36,10 @@ public class 
ZooKeeperMultipleComponentLeaderElectionDriverFactory
 
     @Override
     public ZooKeeperMultipleComponentLeaderElectionDriver create(
-            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener)
+            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
+            FatalErrorHandler fatalErrorHandler)
             throws Exception {
         return new ZooKeeperMultipleComponentLeaderElectionDriver(
-                curatorFramework, leaderElectionListener);
+                curatorFramework, leaderElectionListener, fatalErrorHandler);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
index c515a523540..7d7bd947dc6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
 import org.apache.flink.util.ExceptionUtils;
@@ -117,6 +118,32 @@ class DefaultMultipleComponentLeaderElectionServiceTest {
         }
     }
 
+    @Test
+    void handleFatalError() throws Exception {
+        final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
+                
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();
+
+        final DefaultMultipleComponentLeaderElectionService 
leaderElectionService =
+                
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);
+
+        try {
+            final Throwable expectedFatalError =
+                    new Exception("Expected exception simulating a fatal 
error.");
+
+            leaderElectionDriver.triggerErrorHandling(expectedFatalError);
+
+            FlinkAssertions.assertThatFuture(
+                            fatalErrorHandlerExtension
+                                    .getTestingFatalErrorHandler()
+                                    .getErrorFuture())
+                    .eventuallySucceeds()
+                    .isEqualTo(expectedFatalError);
+        } finally {
+            leaderElectionService.close();
+            
fatalErrorHandlerExtension.getTestingFatalErrorHandler().clearError();
+        }
+    }
+
     @Test
     void unregisteredEventHandlersAreNotNotified() throws Exception {
         final TestingMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java
index 5b937929d41..6462f34e23c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java
@@ -18,31 +18,32 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.BiConsumerWithException;
-import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 /** Testing implementation of {@link MultipleComponentLeaderElectionDriver}. */
 public class TestingMultipleComponentLeaderElectionDriver
         implements MultipleComponentLeaderElectionDriver {
 
-    private final BiConsumerWithException<String, LeaderInformation, Exception>
-            publishLeaderInformationConsumer;
-    private final ThrowingConsumer<String, Exception> 
deleteLeaderInformationConsumer;
+    private final BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer;
+    private final Consumer<String> deleteLeaderInformationConsumer;
     private boolean hasLeadership;
 
     private Optional<Listener> listener;
+    private Optional<FatalErrorHandler> fatalErrorHandler;
 
     private TestingMultipleComponentLeaderElectionDriver(
-            BiConsumerWithException<String, LeaderInformation, Exception>
-                    publishLeaderInformationConsumer,
-            ThrowingConsumer<String, Exception> 
deleteLeaderInformationConsumer) {
+            BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer,
+            Consumer<String> deleteLeaderInformationConsumer) {
         this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
         this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer;
         hasLeadership = false;
         listener = Optional.empty();
+        fatalErrorHandler = Optional.empty();
     }
 
     public void grantLeadership() {
@@ -59,9 +60,14 @@ public class TestingMultipleComponentLeaderElectionDriver
         }
     }
 
-    public void setListener(Listener listener) {
+    public void triggerErrorHandling(Throwable throwable) {
+        fatalErrorHandler.ifPresent(handler -> 
handler.onFatalError(throwable));
+    }
+
+    public void initializeDriver(Listener listener, FatalErrorHandler 
fatalErrorHandler) {
         Preconditions.checkState(!this.listener.isPresent(), "Can only set a 
single listener.");
         this.listener = Optional.of(listener);
+        this.fatalErrorHandler = Optional.of(fatalErrorHandler);
     }
 
     @Override
@@ -73,13 +79,12 @@ public class TestingMultipleComponentLeaderElectionDriver
     }
 
     @Override
-    public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation)
-            throws Exception {
+    public void publishLeaderInformation(String componentId, LeaderInformation 
leaderInformation) {
         publishLeaderInformationConsumer.accept(componentId, 
leaderInformation);
     }
 
     @Override
-    public void deleteLeaderInformation(String componentId) throws Exception {
+    public void deleteLeaderInformation(String componentId) {
         deleteLeaderInformationConsumer.accept(componentId);
     }
 
@@ -88,19 +93,18 @@ public class TestingMultipleComponentLeaderElectionDriver
     }
 
     public static final class Builder {
-        private BiConsumerWithException<String, LeaderInformation, Exception>
-                publishLeaderInformationConsumer = (ignoredA, ignoredB) -> {};
-        private ThrowingConsumer<String, Exception> 
deleteLeaderInformationConsumer = ignored -> {};
+        private BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer =
+                (ignoredA, ignoredB) -> {};
+        private Consumer<String> deleteLeaderInformationConsumer = ignored -> 
{};
 
         public Builder setPublishLeaderInformationConsumer(
-                BiConsumerWithException<String, LeaderInformation, Exception>
-                        publishLeaderInformationConsumer) {
+                BiConsumer<String, LeaderInformation> 
publishLeaderInformationConsumer) {
             this.publishLeaderInformationConsumer = 
publishLeaderInformationConsumer;
             return this;
         }
 
         public Builder setDeleteLeaderInformationConsumer(
-                ThrowingConsumer<String, Exception> 
deleteLeaderInformationConsumer) {
+                Consumer<String> deleteLeaderInformationConsumer) {
             this.deleteLeaderInformationConsumer = 
deleteLeaderInformationConsumer;
             return this;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java
index fe902e2ea0d..bb27d9dd29e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+
 /**
  * Testing implementation of {@link 
MultipleComponentLeaderElectionDriverFactory} that returns a
  * given {@link MultipleComponentLeaderElectionDriver}.
@@ -36,9 +38,11 @@ public class 
TestingMultipleComponentLeaderElectionDriverFactory
 
     @Override
     public MultipleComponentLeaderElectionDriver create(
-            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener)
+            MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
+            FatalErrorHandler fatalErrorHandler)
             throws Exception {
-        
testingMultipleComponentLeaderElectionDriver.setListener(leaderElectionListener);
+        testingMultipleComponentLeaderElectionDriver.initializeDriver(
+                leaderElectionListener, fatalErrorHandler);
 
         return testingMultipleComponentLeaderElectionDriver;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
index 31b6b83cc21..ea90de9b976 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
 import 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriverFactory;
 import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
 import org.apache.flink.util.ExceptionUtils;
@@ -56,6 +58,10 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest {
     private final EachCallbackWrapper<ZooKeeperExtension> eachWrapper =
             new EachCallbackWrapper<>(zooKeeperExtension);
 
+    @RegisterExtension
+    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource =
+            new TestingFatalErrorHandlerExtension();
+
     @Test
     void testElectionDriverGainsLeadershipAtStartup() throws Exception {
         new Context() {
@@ -169,7 +175,9 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest {
 
                                 otherLeaderElectionDriver =
                                         createLeaderElectionDriver(
-                                                
curatorFramework.asCuratorFramework());
+                                                
curatorFramework.asCuratorFramework(),
+                                                
testingFatalErrorHandlerResource
+                                                        
.getTestingFatalErrorHandler());
 
                                 
assertThat(otherLeaderElectionDriver.hasLeadership()).isFalse();
 
@@ -237,7 +245,9 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest {
                     Stream.generate(
                                     () ->
                                             createLeaderElectionDriver(
-                                                    
curatorFramework.asCuratorFramework()))
+                                                    
curatorFramework.asCuratorFramework(),
+                                                    
testingFatalErrorHandlerResource
+                                                            
.getTestingFatalErrorHandler()))
                             .limit(3)
                             .collect(Collectors.toSet());
 
@@ -327,13 +337,15 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest {
         };
     }
 
-    private static ElectionDriver createLeaderElectionDriver(CuratorFramework 
curatorFramework) {
+    private static ElectionDriver createLeaderElectionDriver(
+            CuratorFramework curatorFramework, FatalErrorHandler 
fatalErrorHandler) {
         final SimpleLeaderElectionListener leaderElectionListener =
                 new SimpleLeaderElectionListener();
 
         try {
             final ZooKeeperMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
-                    createLeaderElectionDriver(leaderElectionListener, 
curatorFramework);
+                    createLeaderElectionDriver(
+                            leaderElectionListener, curatorFramework, 
fatalErrorHandler);
             return new ElectionDriver(leaderElectionDriver, 
leaderElectionListener);
         } catch (Exception e) {
             ExceptionUtils.rethrow(e);
@@ -398,10 +410,11 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest {
 
     private static ZooKeeperMultipleComponentLeaderElectionDriver 
createLeaderElectionDriver(
             MultipleComponentLeaderElectionDriver.Listener 
leaderElectionListener,
-            CuratorFramework curatorFramework)
+            CuratorFramework curatorFramework,
+            FatalErrorHandler fatalErrorHandler)
             throws Exception {
         return new ZooKeeperMultipleComponentLeaderElectionDriver(
-                curatorFramework, leaderElectionListener);
+                curatorFramework, leaderElectionListener, fatalErrorHandler);
     }
 
     private CuratorFrameworkWithUnhandledErrorListener startCuratorFramework() 
{
@@ -421,7 +434,9 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest {
             this.curatorFramework = startCuratorFramework();
             this.leaderElectionDriver =
                     createLeaderElectionDriver(
-                            leaderElectionListener, 
curatorFramework.asCuratorFramework());
+                            leaderElectionListener,
+                            curatorFramework.asCuratorFramework(),
+                            
testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
         }
 
         protected final void runTest(RunnableWithException test) throws 
Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index bd968bd8754..1ae648419f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -162,7 +162,8 @@ class ZooKeeperLeaderRetrievalTest {
                                                 
testingFatalErrorHandlerResource
                                                         
.getTestingFatalErrorHandler()),
                                         
ZooKeeperUtils.generateLeaderLatchPath("")),
-                                new TestingLeaderElectionListener());
+                                new TestingLeaderElectionListener(),
+                                
testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
                 externalProcessDriver.isLeader();
 
                 externalProcessDriver.publishLeaderInformation(

Reply via email to