This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6a410899e2c [FLINK-32180][runtime] Moves error handling from DefaultMultipleComponentLeaderElectionService into the MultipleComponentLeaderElectionDriver implementations (#22656) 6a410899e2c is described below commit 6a410899e2c57059f9944e8dd35742efa135838e Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Tue Jun 20 08:14:17 2023 +0200 [FLINK-32180][runtime] Moves error handling from DefaultMultipleComponentLeaderElectionService into the MultipleComponentLeaderElectionDriver implementations (#22656) The error handling needs to be moved into the driver to allow errors that were reported to the k8s watcher to be forwarded to the LeaderContender's error handling. Signed-off-by: Matthias Pohl <matthias.p...@aiven.io> --- ...netesMultipleComponentLeaderElectionDriver.java | 20 ++++++----- ...ltipleComponentLeaderElectionDriverFactory.java | 9 ++--- ...sMultipleComponentLeaderElectionHaServices.java | 3 +- ...aultMultipleComponentLeaderElectionService.java | 20 ++--------- .../MultipleComponentLeaderElectionDriver.java | 7 ++-- ...ltipleComponentLeaderElectionDriverFactory.java | 7 +++- ...eeperMultipleComponentLeaderElectionDriver.java | 34 ++++++++++++------ ...ltipleComponentLeaderElectionDriverFactory.java | 6 ++-- ...MultipleComponentLeaderElectionServiceTest.java | 27 +++++++++++++++ ...stingMultipleComponentLeaderElectionDriver.java | 40 ++++++++++++---------- ...ltipleComponentLeaderElectionDriverFactory.java | 8 +++-- ...rMultipleComponentLeaderElectionDriverTest.java | 29 ++++++++++++---- .../ZooKeeperLeaderRetrievalTest.java | 3 +- 13 files changed, 133 insertions(+), 80 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java index 99af8b573c5..98deaa8cbfa 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriver.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -137,15 +138,18 @@ public class KubernetesMultipleComponentLeaderElectionDriver } @Override - public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) - throws Exception { + public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) { Preconditions.checkState(running.get()); - kubeClient - .checkAndUpdateConfigMap( - configMapName, - updateConfigMapWithLeaderInformation(componentId, leaderInformation)) - .get(); + try { + kubeClient + .checkAndUpdateConfigMap( + configMapName, + updateConfigMapWithLeaderInformation(componentId, leaderInformation)) + .get(); + } catch (InterruptedException | ExecutionException e) { + fatalErrorHandler.onFatalError(e); + } LOG.debug( "Successfully wrote leader information {} for leader {} into the config map {}.", @@ -155,7 +159,7 @@ public class KubernetesMultipleComponentLeaderElectionDriver } @Override - public void deleteLeaderInformation(String componentId) throws Exception { + public void deleteLeaderInformation(String componentId) { publishLeaderInformation(componentId, LeaderInformation.empty()); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java index 7dd10e8ada3..8bc21949663 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionDriverFactory.java @@ -39,25 +39,22 @@ public class KubernetesMultipleComponentLeaderElectionDriverFactory private final Executor watchExecutor; - private final FatalErrorHandler fatalErrorHandler; - public KubernetesMultipleComponentLeaderElectionDriverFactory( FlinkKubeClient kubeClient, KubernetesLeaderElectionConfiguration kubernetesLeaderElectionConfiguration, KubernetesConfigMapSharedWatcher configMapSharedWatcher, - Executor watchExecutor, - FatalErrorHandler fatalErrorHandler) { + Executor watchExecutor) { this.kubeClient = Preconditions.checkNotNull(kubeClient); this.kubernetesLeaderElectionConfiguration = Preconditions.checkNotNull(kubernetesLeaderElectionConfiguration); this.configMapSharedWatcher = Preconditions.checkNotNull(configMapSharedWatcher); this.watchExecutor = Preconditions.checkNotNull(watchExecutor); - this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); } @Override public KubernetesMultipleComponentLeaderElectionDriver create( - MultipleComponentLeaderElectionDriver.Listener leaderElectionListener) + MultipleComponentLeaderElectionDriver.Listener leaderElectionListener, + FatalErrorHandler fatalErrorHandler) throws Exception { return new KubernetesMultipleComponentLeaderElectionDriver( kubernetesLeaderElectionConfiguration, diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java index 8438d238509..9092bf1eac5 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java @@ -128,8 +128,7 @@ public class KubernetesMultipleComponentLeaderElectionHaServices extends Abstrac kubeClient, leaderElectionConfiguration, configMapSharedWatcher, - watchExecutorService, - fatalErrorHandler)); + watchExecutorService)); } catch (Exception e) { throw new FlinkRuntimeException( "Could not initialize the default single leader election service.", e); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java index 4c1a68d7b17..148694e8e78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.util.ExecutorUtils; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -53,8 +52,6 @@ public class DefaultMultipleComponentLeaderElectionService private final MultipleComponentLeaderElectionDriver multipleComponentLeaderElectionDriver; - private final FatalErrorHandler fatalErrorHandler; - @GuardedBy("lock") private final ExecutorService leadershipOperationExecutor; @@ -75,14 +72,12 @@ public class DefaultMultipleComponentLeaderElectionService multipleComponentLeaderElectionDriverFactory, ExecutorService leadershipOperationExecutor) throws Exception { - this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); - this.leadershipOperationExecutor = Preconditions.checkNotNull(leadershipOperationExecutor); leaderElectionEventHandlers = new HashMap<>(); multipleComponentLeaderElectionDriver = - multipleComponentLeaderElectionDriverFactory.create(this); + multipleComponentLeaderElectionDriverFactory.create(this, fatalErrorHandler); } public DefaultMultipleComponentLeaderElectionService( @@ -120,17 +115,8 @@ public class DefaultMultipleComponentLeaderElectionService @Override public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) { - try { - multipleComponentLeaderElectionDriver.publishLeaderInformation( - componentId, leaderInformation); - } catch (Exception e) { - fatalErrorHandler.onFatalError( - new FlinkException( - String.format( - "Could not write leader information %s for leader %s.", - leaderInformation, componentId), - e)); - } + multipleComponentLeaderElectionDriver.publishLeaderInformation( + componentId, leaderInformation); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java index b48686debcd..b4c99ca428b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriver.java @@ -42,18 +42,15 @@ public interface MultipleComponentLeaderElectionDriver { * * @param componentId identifying the component for which to publish the leader information * @param leaderInformation leader information of the respective component - * @throws Exception if publishing fails */ - void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) - throws Exception; + void publishLeaderInformation(String componentId, LeaderInformation leaderInformation); /** * Deletes the leader information for the given component. * * @param componentId identifying the component for which to delete the leader information - * @throws Exception if deleting fails */ - void deleteLeaderInformation(String componentId) throws Exception; + void deleteLeaderInformation(String componentId); /** * Listener interface for state changes of the {@link MultipleComponentLeaderElectionDriver}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java index c02ccbb7e26..547c3498189 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/MultipleComponentLeaderElectionDriverFactory.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.rpc.FatalErrorHandler; + /** Factory for {@link MultipleComponentLeaderElectionDriver}. */ public interface MultipleComponentLeaderElectionDriverFactory { @@ -27,9 +29,12 @@ public interface MultipleComponentLeaderElectionDriverFactory { * * @param leaderElectionListener listener for the callbacks of the {@link * MultipleComponentLeaderElectionDriver} + * @param fatalErrorHandler component for handling fatal errors. * @return created {@link MultipleComponentLeaderElectionDriver} instance * @throws Exception if the creation fails */ MultipleComponentLeaderElectionDriver create( - MultipleComponentLeaderElectionDriver.Listener leaderElectionListener) throws Exception; + MultipleComponentLeaderElectionDriver.Listener leaderElectionListener, + FatalErrorHandler fatalErrorHandler) + throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java index 97f885844f9..c028f29ec95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -48,6 +49,8 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver private final MultipleComponentLeaderElectionDriver.Listener leaderElectionListener; + private final FatalErrorHandler fatalErrorHandler; + private final LeaderLatch leaderLatch; private final TreeCache treeCache; @@ -59,10 +62,12 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver public ZooKeeperMultipleComponentLeaderElectionDriver( CuratorFramework curatorFramework, - MultipleComponentLeaderElectionDriver.Listener leaderElectionListener) + MultipleComponentLeaderElectionDriver.Listener leaderElectionListener, + FatalErrorHandler fatalErrorHandler) throws Exception { this.curatorFramework = Preconditions.checkNotNull(curatorFramework); this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener); + this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler); this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath()); this.treeCache = @@ -130,8 +135,7 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver } @Override - public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) - throws Exception { + public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) { Preconditions.checkState(running.get()); if (LOG.isDebugEnabled()) { @@ -145,17 +149,25 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver final String connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(componentId); - ZooKeeperUtils.writeLeaderInformationToZooKeeper( - leaderInformation, - curatorFramework, - leaderLatch::hasLeadership, - connectionInformationPath); + try { + ZooKeeperUtils.writeLeaderInformationToZooKeeper( + leaderInformation, + curatorFramework, + leaderLatch::hasLeadership, + connectionInformationPath); + } catch (Exception e) { + fatalErrorHandler.onFatalError(e); + } } @Override - public void deleteLeaderInformation(String leaderName) throws Exception { - ZooKeeperUtils.deleteZNode( - curatorFramework, ZooKeeperUtils.generateZookeeperPath(leaderName)); + public void deleteLeaderInformation(String leaderName) { + try { + ZooKeeperUtils.deleteZNode( + curatorFramework, ZooKeeperUtils.generateZookeeperPath(leaderName)); + } catch (Exception e) { + fatalErrorHandler.onFatalError(e); + } } private void handleStateChange(ConnectionState newState) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java index ed2dffdad4c..e02cc0d9fb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework; @@ -35,9 +36,10 @@ public class ZooKeeperMultipleComponentLeaderElectionDriverFactory @Override public ZooKeeperMultipleComponentLeaderElectionDriver create( - MultipleComponentLeaderElectionDriver.Listener leaderElectionListener) + MultipleComponentLeaderElectionDriver.Listener leaderElectionListener, + FatalErrorHandler fatalErrorHandler) throws Exception { return new ZooKeeperMultipleComponentLeaderElectionDriver( - curatorFramework, leaderElectionListener); + curatorFramework, leaderElectionListener, fatalErrorHandler); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java index c515a523540..7d7bd947dc6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionServiceTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension; import org.apache.flink.util.ExceptionUtils; @@ -117,6 +118,32 @@ class DefaultMultipleComponentLeaderElectionServiceTest { } } + @Test + void handleFatalError() throws Exception { + final TestingMultipleComponentLeaderElectionDriver leaderElectionDriver = + TestingMultipleComponentLeaderElectionDriver.newBuilder().build(); + + final DefaultMultipleComponentLeaderElectionService leaderElectionService = + createDefaultMultiplexingLeaderElectionService(leaderElectionDriver); + + try { + final Throwable expectedFatalError = + new Exception("Expected exception simulating a fatal error."); + + leaderElectionDriver.triggerErrorHandling(expectedFatalError); + + FlinkAssertions.assertThatFuture( + fatalErrorHandlerExtension + .getTestingFatalErrorHandler() + .getErrorFuture()) + .eventuallySucceeds() + .isEqualTo(expectedFatalError); + } finally { + leaderElectionService.close(); + fatalErrorHandlerExtension.getTestingFatalErrorHandler().clearError(); + } + } + @Test void unregisteredEventHandlersAreNotNotified() throws Exception { final TestingMultipleComponentLeaderElectionDriver leaderElectionDriver = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java index 5b937929d41..6462f34e23c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriver.java @@ -18,31 +18,32 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.function.BiConsumerWithException; -import org.apache.flink.util.function.ThrowingConsumer; import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Consumer; /** Testing implementation of {@link MultipleComponentLeaderElectionDriver}. */ public class TestingMultipleComponentLeaderElectionDriver implements MultipleComponentLeaderElectionDriver { - private final BiConsumerWithException<String, LeaderInformation, Exception> - publishLeaderInformationConsumer; - private final ThrowingConsumer<String, Exception> deleteLeaderInformationConsumer; + private final BiConsumer<String, LeaderInformation> publishLeaderInformationConsumer; + private final Consumer<String> deleteLeaderInformationConsumer; private boolean hasLeadership; private Optional<Listener> listener; + private Optional<FatalErrorHandler> fatalErrorHandler; private TestingMultipleComponentLeaderElectionDriver( - BiConsumerWithException<String, LeaderInformation, Exception> - publishLeaderInformationConsumer, - ThrowingConsumer<String, Exception> deleteLeaderInformationConsumer) { + BiConsumer<String, LeaderInformation> publishLeaderInformationConsumer, + Consumer<String> deleteLeaderInformationConsumer) { this.publishLeaderInformationConsumer = publishLeaderInformationConsumer; this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer; hasLeadership = false; listener = Optional.empty(); + fatalErrorHandler = Optional.empty(); } public void grantLeadership() { @@ -59,9 +60,14 @@ public class TestingMultipleComponentLeaderElectionDriver } } - public void setListener(Listener listener) { + public void triggerErrorHandling(Throwable throwable) { + fatalErrorHandler.ifPresent(handler -> handler.onFatalError(throwable)); + } + + public void initializeDriver(Listener listener, FatalErrorHandler fatalErrorHandler) { Preconditions.checkState(!this.listener.isPresent(), "Can only set a single listener."); this.listener = Optional.of(listener); + this.fatalErrorHandler = Optional.of(fatalErrorHandler); } @Override @@ -73,13 +79,12 @@ public class TestingMultipleComponentLeaderElectionDriver } @Override - public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) - throws Exception { + public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) { publishLeaderInformationConsumer.accept(componentId, leaderInformation); } @Override - public void deleteLeaderInformation(String componentId) throws Exception { + public void deleteLeaderInformation(String componentId) { deleteLeaderInformationConsumer.accept(componentId); } @@ -88,19 +93,18 @@ public class TestingMultipleComponentLeaderElectionDriver } public static final class Builder { - private BiConsumerWithException<String, LeaderInformation, Exception> - publishLeaderInformationConsumer = (ignoredA, ignoredB) -> {}; - private ThrowingConsumer<String, Exception> deleteLeaderInformationConsumer = ignored -> {}; + private BiConsumer<String, LeaderInformation> publishLeaderInformationConsumer = + (ignoredA, ignoredB) -> {}; + private Consumer<String> deleteLeaderInformationConsumer = ignored -> {}; public Builder setPublishLeaderInformationConsumer( - BiConsumerWithException<String, LeaderInformation, Exception> - publishLeaderInformationConsumer) { + BiConsumer<String, LeaderInformation> publishLeaderInformationConsumer) { this.publishLeaderInformationConsumer = publishLeaderInformationConsumer; return this; } public Builder setDeleteLeaderInformationConsumer( - ThrowingConsumer<String, Exception> deleteLeaderInformationConsumer) { + Consumer<String> deleteLeaderInformationConsumer) { this.deleteLeaderInformationConsumer = deleteLeaderInformationConsumer; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java index fe902e2ea0d..bb27d9dd29e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingMultipleComponentLeaderElectionDriverFactory.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.rpc.FatalErrorHandler; + /** * Testing implementation of {@link MultipleComponentLeaderElectionDriverFactory} that returns a * given {@link MultipleComponentLeaderElectionDriver}. @@ -36,9 +38,11 @@ public class TestingMultipleComponentLeaderElectionDriverFactory @Override public MultipleComponentLeaderElectionDriver create( - MultipleComponentLeaderElectionDriver.Listener leaderElectionListener) + MultipleComponentLeaderElectionDriver.Listener leaderElectionListener, + FatalErrorHandler fatalErrorHandler) throws Exception { - testingMultipleComponentLeaderElectionDriver.setListener(leaderElectionListener); + testingMultipleComponentLeaderElectionDriver.initializeDriver( + leaderElectionListener, fatalErrorHandler); return testingMultipleComponentLeaderElectionDriver; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java index 31b6b83cc21..ea90de9b976 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriverTest.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriverFactory; import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; import org.apache.flink.util.ExceptionUtils; @@ -56,6 +58,10 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest { private final EachCallbackWrapper<ZooKeeperExtension> eachWrapper = new EachCallbackWrapper<>(zooKeeperExtension); + @RegisterExtension + final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = + new TestingFatalErrorHandlerExtension(); + @Test void testElectionDriverGainsLeadershipAtStartup() throws Exception { new Context() { @@ -169,7 +175,9 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest { otherLeaderElectionDriver = createLeaderElectionDriver( - curatorFramework.asCuratorFramework()); + curatorFramework.asCuratorFramework(), + testingFatalErrorHandlerResource + .getTestingFatalErrorHandler()); assertThat(otherLeaderElectionDriver.hasLeadership()).isFalse(); @@ -237,7 +245,9 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest { Stream.generate( () -> createLeaderElectionDriver( - curatorFramework.asCuratorFramework())) + curatorFramework.asCuratorFramework(), + testingFatalErrorHandlerResource + .getTestingFatalErrorHandler())) .limit(3) .collect(Collectors.toSet()); @@ -327,13 +337,15 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest { }; } - private static ElectionDriver createLeaderElectionDriver(CuratorFramework curatorFramework) { + private static ElectionDriver createLeaderElectionDriver( + CuratorFramework curatorFramework, FatalErrorHandler fatalErrorHandler) { final SimpleLeaderElectionListener leaderElectionListener = new SimpleLeaderElectionListener(); try { final ZooKeeperMultipleComponentLeaderElectionDriver leaderElectionDriver = - createLeaderElectionDriver(leaderElectionListener, curatorFramework); + createLeaderElectionDriver( + leaderElectionListener, curatorFramework, fatalErrorHandler); return new ElectionDriver(leaderElectionDriver, leaderElectionListener); } catch (Exception e) { ExceptionUtils.rethrow(e); @@ -398,10 +410,11 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest { private static ZooKeeperMultipleComponentLeaderElectionDriver createLeaderElectionDriver( MultipleComponentLeaderElectionDriver.Listener leaderElectionListener, - CuratorFramework curatorFramework) + CuratorFramework curatorFramework, + FatalErrorHandler fatalErrorHandler) throws Exception { return new ZooKeeperMultipleComponentLeaderElectionDriver( - curatorFramework, leaderElectionListener); + curatorFramework, leaderElectionListener, fatalErrorHandler); } private CuratorFrameworkWithUnhandledErrorListener startCuratorFramework() { @@ -421,7 +434,9 @@ class ZooKeeperMultipleComponentLeaderElectionDriverTest { this.curatorFramework = startCuratorFramework(); this.leaderElectionDriver = createLeaderElectionDriver( - leaderElectionListener, curatorFramework.asCuratorFramework()); + leaderElectionListener, + curatorFramework.asCuratorFramework(), + testingFatalErrorHandlerResource.getTestingFatalErrorHandler()); } protected final void runTest(RunnableWithException test) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java index bd968bd8754..1ae648419f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java @@ -162,7 +162,8 @@ class ZooKeeperLeaderRetrievalTest { testingFatalErrorHandlerResource .getTestingFatalErrorHandler()), ZooKeeperUtils.generateLeaderLatchPath("")), - new TestingLeaderElectionListener()); + new TestingLeaderElectionListener(), + testingFatalErrorHandlerResource.getTestingFatalErrorHandler()); externalProcessDriver.isLeader(); externalProcessDriver.publishLeaderInformation(