This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b814c369256f011f18fc7283d60cd4a27593d988 Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Tue Apr 18 16:17:02 2023 +0200 [FLINK-31878][runtime] Adds event processing in a separate thread to DefaultLeaderElectionService Signed-off-by: Matthias Pohl <matthias.p...@aiven.io> --- .../DefaultLeaderElectionService.java | 62 ++++- .../leaderelection/LeaderElectionEventHandler.java | 6 + .../DefaultLeaderElectionServiceTest.java | 280 ++++++++++++++++++--- 3 files changed, 303 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index 19a56fcbd26..f75896a6967 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -20,7 +20,10 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,10 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,7 +74,20 @@ public class DefaultLeaderElectionService // this.running=true ensures that leaderContender != null private LeaderElectionDriver leaderElectionDriver; + private final ExecutorService leadershipOperationExecutor; + public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory) { + this( + leaderElectionDriverFactory, + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + "DefaultLeaderElectionService-leadershipOperationExecutor"))); + } + + @VisibleForTesting + DefaultLeaderElectionService( + LeaderElectionDriverFactory leaderElectionDriverFactory, + ExecutorService leadershipOperationExecutor) { this.leaderElectionDriverFactory = checkNotNull(leaderElectionDriverFactory); this.leaderContender = null; @@ -79,6 +99,8 @@ public class DefaultLeaderElectionService this.confirmedLeaderInformation = LeaderInformation.empty(); this.running = false; + + this.leadershipOperationExecutor = leadershipOperationExecutor; } @Override @@ -118,6 +140,10 @@ public class DefaultLeaderElectionService } leaderElectionDriver.close(); + + // graceful shutdown needs to happen outside the lock to enable any outstanding + // grant/revoke events to be processed without the lock being acquired by the service + ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, leadershipOperationExecutor); } @Override @@ -184,6 +210,10 @@ public class DefaultLeaderElectionService @Override public void onGrantLeadership(UUID newLeaderSessionId) { + runInLeaderEventThread(() -> onGrantLeadershipInternal(newLeaderSessionId)); + } + + private void onGrantLeadershipInternal(UUID newLeaderSessionId) { synchronized (lock) { if (running) { issuedLeaderSessionID = newLeaderSessionId; @@ -205,6 +235,10 @@ public class DefaultLeaderElectionService @Override public void onRevokeLeadership() { + runInLeaderEventThread(this::onRevokeLeadershipInternal); + } + + private void onRevokeLeadershipInternal() { synchronized (lock) { if (running) { handleLeadershipLoss(); @@ -233,6 +267,10 @@ public class DefaultLeaderElectionService @Override public void onLeaderInformationChange(LeaderInformation leaderInformation) { + runInLeaderEventThread(() -> onLeaderInformationChangeInternal(leaderInformation)); + } + + private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) { synchronized (lock) { if (running) { LOG.trace( @@ -257,12 +295,28 @@ public class DefaultLeaderElectionService } } else { LOG.debug( - "Ignoring change notification since the {} has " + "already been closed.", + "Ignoring change notification since the {} has already been closed.", leaderElectionDriver); } } } + private void runInLeaderEventThread(Runnable callback) { + if (running) { + FutureUtils.handleUncaughtException( + CompletableFuture.runAsync(callback, leadershipOperationExecutor), + (thread, error) -> forwardErrorToLeaderContender(error)); + } + } + + private void forwardErrorToLeaderContender(Throwable t) { + if (t instanceof LeaderElectionException) { + leaderContender.handleError((LeaderElectionException) t); + } else { + leaderContender.handleError(new LeaderElectionException(t)); + } + } + private class LeaderElectionFatalErrorHandler implements FatalErrorHandler { @Override @@ -273,11 +327,7 @@ public class DefaultLeaderElectionService return; } - if (throwable instanceof LeaderElectionException) { - leaderContender.handleError((LeaderElectionException) throwable); - } else { - leaderContender.handleError(new LeaderElectionException(throwable)); - } + forwardErrorToLeaderContender(throwable); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java index 9e3fa314287..15f66a1bc54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.leaderelection; +import javax.annotation.concurrent.NotThreadSafe; + import java.util.UUID; /** @@ -29,7 +31,11 @@ import java.util.UUID; * LeaderElectionDriver#close()}. This means that the implementor of {@link * LeaderElectionEventHandler} is responsible for filtering out spurious callbacks(e.g. after close * has been called on {@link LeaderElectionDriver}). + * + * <p>The order of events matters. Therefore, calling event processing functions of this interface + * should happen in a single-thread environment. */ +@NotThreadSafe public interface LeaderElectionEventHandler { /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java index 358dfcbfe2c..17bc0787d22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java @@ -18,13 +18,21 @@ package org.apache.flink.runtime.leaderelection; -import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.FlinkAssertions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -37,7 +45,7 @@ class DefaultLeaderElectionServiceTest { void testOnGrantAndRevokeLeadership() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { // grant leadership testingLeaderElectionDriver.isLeader(); @@ -73,7 +81,7 @@ class DefaultLeaderElectionServiceTest { void testProperCleanupOnStopWhenHoldingTheLeadership() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { testingLeaderElectionDriver.isLeader(); testingContender.waitForLeader(); @@ -105,7 +113,7 @@ class DefaultLeaderElectionServiceTest { void testLeaderInformationChangedAndShouldBeCorrected() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { testingLeaderElectionDriver.isLeader(); @@ -130,22 +138,107 @@ class DefaultLeaderElectionServiceTest { } @Test - void testHasLeadership() throws Exception { + void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception { new Context() { { - runTest( - () -> { - testingLeaderElectionDriver.isLeader(); - final UUID currentLeaderSessionId = - leaderElectionService.getLeaderSessionID(); - assertThat(currentLeaderSessionId).isNotNull(); - assertThat(leaderElectionService.hasLeadership(currentLeaderSessionId)) + runTestWithManuallyTriggeredEvents( + executorService -> { + final UUID expectedSessionID = UUID.randomUUID(); + + testingLeaderElectionDriver.isLeader(expectedSessionID); + + assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + .isFalse(); + assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) + .isFalse(); + }); + } + }; + } + + @Test + void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { + new Context() { + { + runTestWithManuallyTriggeredEvents( + executorService -> { + final UUID expectedSessionID = UUID.randomUUID(); + + testingLeaderElectionDriver.isLeader(expectedSessionID); + executorService.trigger(); + + assertThat(leaderElectionService.hasLeadership(expectedSessionID)) .isTrue(); assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) .isFalse(); + }); + } + }; + } + + @Test + void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Exception { + new Context() { + { + runTestWithManuallyTriggeredEvents( + executorService -> { + final UUID expectedSessionID = UUID.randomUUID(); + + testingLeaderElectionDriver.isLeader(expectedSessionID); + executorService.trigger(); + testingLeaderElectionDriver.notLeader(); + + assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + .as( + "No operation should be handled anymore after the HA backend " + + "indicated leadership loss even if the onRevokeLeadership wasn't " + + "processed, yet, because some other process could have picked up " + + "the leadership in the meantime already based on the HA " + + "backend's decision.") + .isFalse(); + assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) + .isFalse(); + }); + } + }; + } + + @Test + void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Exception { + new Context() { + { + runTestWithManuallyTriggeredEvents( + executorService -> { + final UUID expectedSessionID = UUID.randomUUID(); + + testingLeaderElectionDriver.isLeader(expectedSessionID); + executorService.trigger(); + + testingLeaderElectionDriver.notLeader(); + executorService.trigger(); + + assertThat(leaderElectionService.hasLeadership(expectedSessionID)) + .isFalse(); + assertThat(leaderElectionService.hasLeadership(UUID.randomUUID())) + .isFalse(); + }); + } + }; + } + + @Test + void testHasLeadershipAfterStop() throws Exception { + new Context() { + { + runTestWithManuallyTriggeredEvents( + executorService -> { + final UUID expectedSessionID = UUID.randomUUID(); + testingLeaderElectionDriver.isLeader(expectedSessionID); + executorService.trigger(); leaderElectionService.stop(); - assertThat(leaderElectionService.hasLeadership(currentLeaderSessionId)) + + assertThat(leaderElectionService.hasLeadership(expectedSessionID)) .isFalse(); }); } @@ -156,7 +249,7 @@ class DefaultLeaderElectionServiceTest { void testLeaderInformationChangedIfNotBeingLeader() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { final LeaderInformation faultyLeader = LeaderInformation.known(UUID.randomUUID(), "faulty-address"); @@ -173,10 +266,15 @@ class DefaultLeaderElectionServiceTest { void testOnGrantLeadershipIsIgnoredAfterBeingStop() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { leaderElectionService.stop(); testingLeaderElectionDriver.isLeader(); + + assertThat(leaderElectionService.getLeaderSessionID()) + .as( + "The grant event shouldn't have been processed by the LeaderElectionService.") + .isNull(); // leader contender is not granted leadership assertThat(testingContender.getLeaderSessionID()).isNull(); }); @@ -188,7 +286,7 @@ class DefaultLeaderElectionServiceTest { void testOnLeaderInformationChangeIsIgnoredAfterBeingStop() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { testingLeaderElectionDriver.isLeader(); @@ -208,7 +306,7 @@ class DefaultLeaderElectionServiceTest { void testOnRevokeLeadershipIsTriggeredAfterBeingStop() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { testingLeaderElectionDriver.isLeader(); final UUID oldSessionId = leaderElectionService.getLeaderSessionID(); @@ -230,7 +328,7 @@ class DefaultLeaderElectionServiceTest { void testOldConfirmLeaderInformation() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { testingLeaderElectionDriver.isLeader(); final UUID currentLeaderSessionId = @@ -250,7 +348,7 @@ class DefaultLeaderElectionServiceTest { void testErrorForwarding() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { final Exception testException = new Exception("test leader exception"); @@ -269,7 +367,7 @@ class DefaultLeaderElectionServiceTest { void testErrorIsIgnoredAfterBeingStop() throws Exception { new Context() { { - runTest( + runTestWithSynchronousEventHandling( () -> { final Exception testException = new Exception("test leader exception"); @@ -301,35 +399,139 @@ class DefaultLeaderElectionServiceTest { Preconditions.checkNotNull( testingLeaderElectionDriverFactory.getCurrentLeaderDriver()); - final CheckedThread isLeaderThread = - new CheckedThread() { - @Override - public void go() { - currentLeaderDriver.isLeader(); - } - }; - isLeaderThread.start(); + currentLeaderDriver.isLeader(); leaderElectionService.stop(); - isLeaderThread.sync(); + } + + @Test + void testOnLeadershipChangeDoesNotBlock() throws Exception { + final CompletableFuture<LeaderInformation> initialLeaderInformation = + new CompletableFuture<>(); + final OneShotLatch latch = new OneShotLatch(); + + final TestingGenericLeaderElectionDriver driver = + TestingGenericLeaderElectionDriver.newBuilder() + .setWriteLeaderInformationConsumer( + leaderInformation -> { + // the first call saves the confirmed LeaderInformation + if (!initialLeaderInformation.isDone()) { + initialLeaderInformation.complete(leaderInformation); + } else { + latch.awaitQuietly(); + } + }) + .setHasLeadershipSupplier(() -> true) + .build(); + + final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService( + (leaderElectionEventHandler, errorHandler) -> driver); + + final String address = "leader-address"; + testInstance.start( + TestingGenericLeaderContender.newBuilder() + .setGrantLeadershipConsumer( + sessionID -> testInstance.confirmLeadership(sessionID, address)) + .build()); + + // initial messages to initialize usedLeaderSessionID and confirmedLeaderInformation + final UUID sessionID = UUID.randomUUID(); + testInstance.onGrantLeadership(sessionID); + + FlinkAssertions.assertThatFuture(initialLeaderInformation) + .eventuallySucceeds() + .as("The LeaderInformation should have been forwarded to the driver.") + .isEqualTo(LeaderInformation.known(sessionID, address)); + + // this call shouldn't block the test execution + testInstance.onLeaderInformationChange(LeaderInformation.empty()); + + latch.trigger(); + + testInstance.stop(); + } + + @Test + void testOnGrantLeadershipAsyncDoesNotBlock() throws Exception { + testNonBlockingCall( + latch -> + TestingGenericLeaderContender.newBuilder() + .setGrantLeadershipConsumer( + ignoredSessionID -> latch.awaitQuietly()) + .build(), + TestingLeaderElectionDriver::isLeader); + } + + @Test + void testOnRevokeLeadershipDoesNotBlock() throws Exception { + testNonBlockingCall( + latch -> + TestingGenericLeaderContender.newBuilder() + .setRevokeLeadershipRunnable(latch::awaitQuietly) + .build(), + driver -> { + driver.isLeader(); + // this call should not block the test execution + driver.notLeader(); + }); + } + + private static void testNonBlockingCall( + Function<OneShotLatch, TestingGenericLeaderContender> contenderCreator, + Consumer<TestingLeaderElectionDriver> driverAction) + throws Exception { + final OneShotLatch latch = new OneShotLatch(); + final TestingGenericLeaderContender contender = contenderCreator.apply(latch); + + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + + final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService(driverFactory); + testInstance.start(contender); + + final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver(); + assertThat(driver).isNotNull(); + + driverAction.accept(driver); + + latch.trigger(); + + testInstance.stop(); } private static class Context { - final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory - testingLeaderElectionDriverFactory = - new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); - final DefaultLeaderElectionService leaderElectionService = - new DefaultLeaderElectionService(testingLeaderElectionDriverFactory); - final TestingContender testingContender = - new TestingContender(TEST_URL, leaderElectionService); + + DefaultLeaderElectionService leaderElectionService; + TestingContender testingContender; TestingLeaderElectionDriver testingLeaderElectionDriver; - void runTest(RunnableWithException testMethod) throws Exception { + void runTestWithSynchronousEventHandling(RunnableWithException testMethod) + throws Exception { + runTest(testMethod, Executors.newDirectExecutorService()); + } + + void runTestWithManuallyTriggeredEvents( + ThrowingConsumer<ManuallyTriggeredScheduledExecutorService, Exception> testMethod) + throws Exception { + final ManuallyTriggeredScheduledExecutorService executorService = + new ManuallyTriggeredScheduledExecutorService(); + runTest(() -> testMethod.accept(executorService), executorService); + } + + void runTest(RunnableWithException testMethod, ExecutorService leaderEventOperationExecutor) + throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + leaderElectionService = + new DefaultLeaderElectionService(driverFactory, leaderEventOperationExecutor); + testingContender = new TestingContender(TEST_URL, leaderElectionService); + leaderElectionService.start(testingContender); + testingLeaderElectionDriver = driverFactory.getCurrentLeaderDriver(); - testingLeaderElectionDriver = - testingLeaderElectionDriverFactory.getCurrentLeaderDriver(); assertThat(testingLeaderElectionDriver).isNotNull(); testMethod.run();