This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b814c369256f011f18fc7283d60cd4a27593d988
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Tue Apr 18 16:17:02 2023 +0200

    [FLINK-31878][runtime] Adds event processing in a separate thread to 
DefaultLeaderElectionService
    
    Signed-off-by: Matthias Pohl <matthias.p...@aiven.io>
---
 .../DefaultLeaderElectionService.java              |  62 ++++-
 .../leaderelection/LeaderElectionEventHandler.java |   6 +
 .../DefaultLeaderElectionServiceTest.java          | 280 ++++++++++++++++++---
 3 files changed, 303 insertions(+), 45 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
index 19a56fcbd26..f75896a6967 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
@@ -20,7 +20,10 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +33,10 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -67,7 +74,20 @@ public class DefaultLeaderElectionService
     // this.running=true ensures that leaderContender != null
     private LeaderElectionDriver leaderElectionDriver;
 
+    private final ExecutorService leadershipOperationExecutor;
+
     public DefaultLeaderElectionService(LeaderElectionDriverFactory 
leaderElectionDriverFactory) {
+        this(
+                leaderElectionDriverFactory,
+                Executors.newSingleThreadExecutor(
+                        new ExecutorThreadFactory(
+                                
"DefaultLeaderElectionService-leadershipOperationExecutor")));
+    }
+
+    @VisibleForTesting
+    DefaultLeaderElectionService(
+            LeaderElectionDriverFactory leaderElectionDriverFactory,
+            ExecutorService leadershipOperationExecutor) {
         this.leaderElectionDriverFactory = 
checkNotNull(leaderElectionDriverFactory);
 
         this.leaderContender = null;
@@ -79,6 +99,8 @@ public class DefaultLeaderElectionService
         this.confirmedLeaderInformation = LeaderInformation.empty();
 
         this.running = false;
+
+        this.leadershipOperationExecutor = leadershipOperationExecutor;
     }
 
     @Override
@@ -118,6 +140,10 @@ public class DefaultLeaderElectionService
         }
 
         leaderElectionDriver.close();
+
+        // graceful shutdown needs to happen outside the lock to enable any 
outstanding
+        // grant/revoke events to be processed without the lock being acquired 
by the service
+        ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, 
leadershipOperationExecutor);
     }
 
     @Override
@@ -184,6 +210,10 @@ public class DefaultLeaderElectionService
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        runInLeaderEventThread(() -> 
onGrantLeadershipInternal(newLeaderSessionId));
+    }
+
+    private void onGrantLeadershipInternal(UUID newLeaderSessionId) {
         synchronized (lock) {
             if (running) {
                 issuedLeaderSessionID = newLeaderSessionId;
@@ -205,6 +235,10 @@ public class DefaultLeaderElectionService
 
     @Override
     public void onRevokeLeadership() {
+        runInLeaderEventThread(this::onRevokeLeadershipInternal);
+    }
+
+    private void onRevokeLeadershipInternal() {
         synchronized (lock) {
             if (running) {
                 handleLeadershipLoss();
@@ -233,6 +267,10 @@ public class DefaultLeaderElectionService
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) 
{
+        runInLeaderEventThread(() -> 
onLeaderInformationChangeInternal(leaderInformation));
+    }
+
+    private void onLeaderInformationChangeInternal(LeaderInformation 
leaderInformation) {
         synchronized (lock) {
             if (running) {
                 LOG.trace(
@@ -257,12 +295,28 @@ public class DefaultLeaderElectionService
                 }
             } else {
                 LOG.debug(
-                        "Ignoring change notification since the {} has " + 
"already been closed.",
+                        "Ignoring change notification since the {} has already 
been closed.",
                         leaderElectionDriver);
             }
         }
     }
 
+    private void runInLeaderEventThread(Runnable callback) {
+        if (running) {
+            FutureUtils.handleUncaughtException(
+                    CompletableFuture.runAsync(callback, 
leadershipOperationExecutor),
+                    (thread, error) -> forwardErrorToLeaderContender(error));
+        }
+    }
+
+    private void forwardErrorToLeaderContender(Throwable t) {
+        if (t instanceof LeaderElectionException) {
+            leaderContender.handleError((LeaderElectionException) t);
+        } else {
+            leaderContender.handleError(new LeaderElectionException(t));
+        }
+    }
+
     private class LeaderElectionFatalErrorHandler implements FatalErrorHandler 
{
 
         @Override
@@ -273,11 +327,7 @@ public class DefaultLeaderElectionService
                     return;
                 }
 
-                if (throwable instanceof LeaderElectionException) {
-                    leaderContender.handleError((LeaderElectionException) 
throwable);
-                } else {
-                    leaderContender.handleError(new 
LeaderElectionException(throwable));
-                }
+                forwardErrorToLeaderContender(throwable);
             }
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
index 9e3fa314287..15f66a1bc54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.util.UUID;
 
 /**
@@ -29,7 +31,11 @@ import java.util.UUID;
  * LeaderElectionDriver#close()}. This means that the implementor of {@link
  * LeaderElectionEventHandler} is responsible for filtering out spurious 
callbacks(e.g. after close
  * has been called on {@link LeaderElectionDriver}).
+ *
+ * <p>The order of events matters. Therefore, calling event processing 
functions of this interface
+ * should happen in a single-thread environment.
  */
+@NotThreadSafe
 public interface LeaderElectionEventHandler {
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
index 358dfcbfe2c..17bc0787d22 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
@@ -18,13 +18,21 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -37,7 +45,7 @@ class DefaultLeaderElectionServiceTest {
     void testOnGrantAndRevokeLeadership() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             // grant leadership
                             testingLeaderElectionDriver.isLeader();
@@ -73,7 +81,7 @@ class DefaultLeaderElectionServiceTest {
     void testProperCleanupOnStopWhenHoldingTheLeadership() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             testingLeaderElectionDriver.isLeader();
                             testingContender.waitForLeader();
@@ -105,7 +113,7 @@ class DefaultLeaderElectionServiceTest {
     void testLeaderInformationChangedAndShouldBeCorrected() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             testingLeaderElectionDriver.isLeader();
 
@@ -130,22 +138,107 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testHasLeadership() throws Exception {
+    void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws 
Exception {
         new Context() {
             {
-                runTest(
-                        () -> {
-                            testingLeaderElectionDriver.isLeader();
-                            final UUID currentLeaderSessionId =
-                                    leaderElectionService.getLeaderSessionID();
-                            assertThat(currentLeaderSessionId).isNotNull();
-                            
assertThat(leaderElectionService.hasLeadership(currentLeaderSessionId))
+                runTestWithManuallyTriggeredEvents(
+                        executorService -> {
+                            final UUID expectedSessionID = UUID.randomUUID();
+
+                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+
+                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                                    .isFalse();
+                            
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
+                                    .isFalse();
+                        });
+            }
+        };
+    }
+
+    @Test
+    void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws 
Exception {
+        new Context() {
+            {
+                runTestWithManuallyTriggeredEvents(
+                        executorService -> {
+                            final UUID expectedSessionID = UUID.randomUUID();
+
+                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            executorService.trigger();
+
+                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
                                     .isTrue();
                             
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
                                     .isFalse();
+                        });
+            }
+        };
+    }
+
+    @Test
+    void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws 
Exception {
+        new Context() {
+            {
+                runTestWithManuallyTriggeredEvents(
+                        executorService -> {
+                            final UUID expectedSessionID = UUID.randomUUID();
+
+                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            executorService.trigger();
+                            testingLeaderElectionDriver.notLeader();
+
+                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                                    .as(
+                                            "No operation should be handled 
anymore after the HA backend "
+                                                    + "indicated leadership 
loss even if the onRevokeLeadership wasn't "
+                                                    + "processed, yet, because 
some other process could have picked up "
+                                                    + "the leadership in the 
meantime already based on the HA "
+                                                    + "backend's decision.")
+                                    .isFalse();
+                            
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
+                                    .isFalse();
+                        });
+            }
+        };
+    }
+
+    @Test
+    void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws 
Exception {
+        new Context() {
+            {
+                runTestWithManuallyTriggeredEvents(
+                        executorService -> {
+                            final UUID expectedSessionID = UUID.randomUUID();
+
+                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            executorService.trigger();
+
+                            testingLeaderElectionDriver.notLeader();
+                            executorService.trigger();
+
+                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
+                                    .isFalse();
+                            
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()))
+                                    .isFalse();
+                        });
+            }
+        };
+    }
+
+    @Test
+    void testHasLeadershipAfterStop() throws Exception {
+        new Context() {
+            {
+                runTestWithManuallyTriggeredEvents(
+                        executorService -> {
+                            final UUID expectedSessionID = UUID.randomUUID();
+                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            executorService.trigger();
 
                             leaderElectionService.stop();
-                            
assertThat(leaderElectionService.hasLeadership(currentLeaderSessionId))
+
+                            
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
                                     .isFalse();
                         });
             }
@@ -156,7 +249,7 @@ class DefaultLeaderElectionServiceTest {
     void testLeaderInformationChangedIfNotBeingLeader() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             final LeaderInformation faultyLeader =
                                     LeaderInformation.known(UUID.randomUUID(), 
"faulty-address");
@@ -173,10 +266,15 @@ class DefaultLeaderElectionServiceTest {
     void testOnGrantLeadershipIsIgnoredAfterBeingStop() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             leaderElectionService.stop();
                             testingLeaderElectionDriver.isLeader();
+
+                            
assertThat(leaderElectionService.getLeaderSessionID())
+                                    .as(
+                                            "The grant event shouldn't have 
been processed by the LeaderElectionService.")
+                                    .isNull();
                             // leader contender is not granted leadership
                             
assertThat(testingContender.getLeaderSessionID()).isNull();
                         });
@@ -188,7 +286,7 @@ class DefaultLeaderElectionServiceTest {
     void testOnLeaderInformationChangeIsIgnoredAfterBeingStop() throws 
Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             testingLeaderElectionDriver.isLeader();
 
@@ -208,7 +306,7 @@ class DefaultLeaderElectionServiceTest {
     void testOnRevokeLeadershipIsTriggeredAfterBeingStop() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             testingLeaderElectionDriver.isLeader();
                             final UUID oldSessionId = 
leaderElectionService.getLeaderSessionID();
@@ -230,7 +328,7 @@ class DefaultLeaderElectionServiceTest {
     void testOldConfirmLeaderInformation() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             testingLeaderElectionDriver.isLeader();
                             final UUID currentLeaderSessionId =
@@ -250,7 +348,7 @@ class DefaultLeaderElectionServiceTest {
     void testErrorForwarding() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             final Exception testException = new 
Exception("test leader exception");
 
@@ -269,7 +367,7 @@ class DefaultLeaderElectionServiceTest {
     void testErrorIsIgnoredAfterBeingStop() throws Exception {
         new Context() {
             {
-                runTest(
+                runTestWithSynchronousEventHandling(
                         () -> {
                             final Exception testException = new 
Exception("test leader exception");
 
@@ -301,35 +399,139 @@ class DefaultLeaderElectionServiceTest {
                 Preconditions.checkNotNull(
                         
testingLeaderElectionDriverFactory.getCurrentLeaderDriver());
 
-        final CheckedThread isLeaderThread =
-                new CheckedThread() {
-                    @Override
-                    public void go() {
-                        currentLeaderDriver.isLeader();
-                    }
-                };
-        isLeaderThread.start();
+        currentLeaderDriver.isLeader();
 
         leaderElectionService.stop();
-        isLeaderThread.sync();
+    }
+
+    @Test
+    void testOnLeadershipChangeDoesNotBlock() throws Exception {
+        final CompletableFuture<LeaderInformation> initialLeaderInformation =
+                new CompletableFuture<>();
+        final OneShotLatch latch = new OneShotLatch();
+
+        final TestingGenericLeaderElectionDriver driver =
+                TestingGenericLeaderElectionDriver.newBuilder()
+                        .setWriteLeaderInformationConsumer(
+                                leaderInformation -> {
+                                    // the first call saves the confirmed 
LeaderInformation
+                                    if (!initialLeaderInformation.isDone()) {
+                                        
initialLeaderInformation.complete(leaderInformation);
+                                    } else {
+                                        latch.awaitQuietly();
+                                    }
+                                })
+                        .setHasLeadershipSupplier(() -> true)
+                        .build();
+
+        final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(
+                        (leaderElectionEventHandler, errorHandler) -> driver);
+
+        final String address = "leader-address";
+        testInstance.start(
+                TestingGenericLeaderContender.newBuilder()
+                        .setGrantLeadershipConsumer(
+                                sessionID -> 
testInstance.confirmLeadership(sessionID, address))
+                        .build());
+
+        // initial messages to initialize usedLeaderSessionID and 
confirmedLeaderInformation
+        final UUID sessionID = UUID.randomUUID();
+        testInstance.onGrantLeadership(sessionID);
+
+        FlinkAssertions.assertThatFuture(initialLeaderInformation)
+                .eventuallySucceeds()
+                .as("The LeaderInformation should have been forwarded to the 
driver.")
+                .isEqualTo(LeaderInformation.known(sessionID, address));
+
+        // this call shouldn't block the test execution
+        testInstance.onLeaderInformationChange(LeaderInformation.empty());
+
+        latch.trigger();
+
+        testInstance.stop();
+    }
+
+    @Test
+    void testOnGrantLeadershipAsyncDoesNotBlock() throws Exception {
+        testNonBlockingCall(
+                latch ->
+                        TestingGenericLeaderContender.newBuilder()
+                                .setGrantLeadershipConsumer(
+                                        ignoredSessionID -> 
latch.awaitQuietly())
+                                .build(),
+                TestingLeaderElectionDriver::isLeader);
+    }
+
+    @Test
+    void testOnRevokeLeadershipDoesNotBlock() throws Exception {
+        testNonBlockingCall(
+                latch ->
+                        TestingGenericLeaderContender.newBuilder()
+                                
.setRevokeLeadershipRunnable(latch::awaitQuietly)
+                                .build(),
+                driver -> {
+                    driver.isLeader();
+                    // this call should not block the test execution
+                    driver.notLeader();
+                });
+    }
+
+    private static void testNonBlockingCall(
+            Function<OneShotLatch, TestingGenericLeaderContender> 
contenderCreator,
+            Consumer<TestingLeaderElectionDriver> driverAction)
+            throws Exception {
+        final OneShotLatch latch = new OneShotLatch();
+        final TestingGenericLeaderContender contender = 
contenderCreator.apply(latch);
+
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory);
+        testInstance.start(contender);
+
+        final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+        assertThat(driver).isNotNull();
+
+        driverAction.accept(driver);
+
+        latch.trigger();
+
+        testInstance.stop();
     }
 
     private static class Context {
-        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
-                testingLeaderElectionDriverFactory =
-                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
-        final DefaultLeaderElectionService leaderElectionService =
-                new 
DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
-        final TestingContender testingContender =
-                new TestingContender(TEST_URL, leaderElectionService);
+
+        DefaultLeaderElectionService leaderElectionService;
+        TestingContender testingContender;
 
         TestingLeaderElectionDriver testingLeaderElectionDriver;
 
-        void runTest(RunnableWithException testMethod) throws Exception {
+        void runTestWithSynchronousEventHandling(RunnableWithException 
testMethod)
+                throws Exception {
+            runTest(testMethod, Executors.newDirectExecutorService());
+        }
+
+        void runTestWithManuallyTriggeredEvents(
+                ThrowingConsumer<ManuallyTriggeredScheduledExecutorService, 
Exception> testMethod)
+                throws Exception {
+            final ManuallyTriggeredScheduledExecutorService executorService =
+                    new ManuallyTriggeredScheduledExecutorService();
+            runTest(() -> testMethod.accept(executorService), executorService);
+        }
+
+        void runTest(RunnableWithException testMethod, ExecutorService 
leaderEventOperationExecutor)
+                throws Exception {
+            final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                    new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+            leaderElectionService =
+                    new DefaultLeaderElectionService(driverFactory, 
leaderEventOperationExecutor);
+            testingContender = new TestingContender(TEST_URL, 
leaderElectionService);
+
             leaderElectionService.start(testingContender);
+            testingLeaderElectionDriver = 
driverFactory.getCurrentLeaderDriver();
 
-            testingLeaderElectionDriver =
-                    
testingLeaderElectionDriverFactory.getCurrentLeaderDriver();
             assertThat(testingLeaderElectionDriver).isNotNull();
             testMethod.run();
 

Reply via email to