This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 3f73e06bfc7319cccad09cdad8bbdf6de5676c77
Author: Yan Zhao <[email protected]>
AuthorDate: Thu Jan 18 16:04:06 2024 +0800

    Fix auditor elector executor block problem. (#4165)
    
    ### Motivation
    Now, when we shut down the auditor elector. The shutdown behavior via 
submitting a shutdown task to do it.
    In some cases, the follower auditor elector executor is always blocked due 
to waiting leader election, so the shutdown task will lie in the task queue 
forever, get no chance to execute.
    
    In the pulsar, the case happen. See 
https://github.com/apache/pulsar/pull/21797#discussion_r1436278822
    
    So in the auditor elector shutdown, if the remain task can't done in time, 
we should invoke shutdownNow to interrupt the blocked thread.
    
    (cherry picked from commit c3748dd0bba8d8534ecb443213c287cd9b8736b7)
---
 .../bookkeeper/replication/AuditorElector.java     |  55 +++++----
 .../replication/AuditorLedgerCheckerTest.java      | 130 +++++++++++++++++----
 2 files changed, 141 insertions(+), 44 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
index badd5fd26e..3c160bd487 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
@@ -24,11 +24,14 @@ import static 
org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -146,26 +149,28 @@ public class AuditorElector {
     /**
      * Run cleanup operations for the auditor elector.
      */
-    private void submitShutdownTask() {
-        executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    if (!running.compareAndSet(true, false)) {
-                        return;
-                    }
-
-                    try {
-                        ledgerAuditorManager.close();
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                        LOG.warn("InterruptedException while closing ledger 
auditor manager", ie);
-                    } catch (Exception ke) {
-                        LOG.error("Exception while closing ledger auditor 
manager", ke);
-                    }
-                }
-            });
+    private Future<?> submitShutdownTask() {
+        return executor.submit(shutdownTask);
     }
 
+    Runnable shutdownTask = new Runnable() {
+        @Override
+        public void run() {
+            if (!running.compareAndSet(true, false)) {
+                return;
+            }
+
+            try {
+                ledgerAuditorManager.close();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.warn("InterruptedException while closing ledger auditor 
manager", ie);
+            } catch (Exception ke) {
+                LOG.error("Exception while closing ledger auditor manager", 
ke);
+            }
+        }
+    };
+
     /**
      * Performing the auditor election using the ZooKeeper ephemeral sequential
      * znode. The bookie which has created the least sequential will be elect 
as
@@ -235,8 +240,18 @@ public class AuditorElector {
                 return;
             }
             // close auditor manager
-            submitShutdownTask();
-            executor.shutdown();
+            try {
+                submitShutdownTask().get(10, TimeUnit.SECONDS);
+                executor.shutdown();
+            } catch (ExecutionException e) {
+                LOG.warn("Failed to close auditor manager", e);
+                executor.shutdownNow();
+                shutdownTask.run();
+            } catch (TimeoutException e) {
+                LOG.warn("Failed to close auditor manager in 10 seconds", e);
+                executor.shutdownNow();
+                shutdownTask.run();
+            }
         }
 
         if (auditor != null) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index ed789320bd..27f2e8b266 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.bookie.BookieImpl;
@@ -381,7 +382,16 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         urLedgerMgr.setLostBookieRecoveryDelay(5);
 
         // shutdown a non auditor bookie; choosing non-auditor to avoid 
another election
-        String shutdownBookie = shutDownNonAuditorBookie();
+        AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
+        CountDownLatch shutdownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                String shutdownBookie = shutDownNonAuditorBookie();
+                shutdownBookieRef.set(shutdownBookie);
+                shutdownLatch.countDown();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         LOG.debug("Waiting for ledgers to be marked as under replicated");
         assertFalse("audit of lost bookie isn't delayed", 
underReplicaLatch.await(4, TimeUnit.SECONDS));
@@ -395,9 +405,10 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
                 urLedgerList.contains(ledgerId));
         Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
         String data = urLedgerData.get(ledgerId);
-        assertTrue("Bookie " + shutdownBookie
+        shutdownLatch.await();
+        assertTrue("Bookie " + shutdownBookieRef.get()
                 + "is not listed in the ledger as missing replica :" + data,
-                data.contains(shutdownBookie));
+                data.contains(shutdownBookieRef.get()));
     }
 
     /**
@@ -454,7 +465,16 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         urLedgerMgr.setLostBookieRecoveryDelay(50);
 
         // shutdown a non auditor bookie; choosing non-auditor to avoid 
another election
-        String shutdownBookie = shutDownNonAuditorBookie();
+        AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
+        CountDownLatch shutdownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                String shutdownBookie = shutDownNonAuditorBookie();
+                shutdownBookieRef.set(shutdownBookie);
+                shutdownLatch.countDown();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         LOG.debug("Waiting for ledgers to be marked as under replicated");
         assertFalse("audit of lost bookie isn't delayed", 
underReplicaLatch.await(4, TimeUnit.SECONDS));
@@ -471,9 +491,10 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
                 urLedgerList.contains(ledgerId));
         Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
         String data = urLedgerData.get(ledgerId);
-        assertTrue("Bookie " + shutdownBookie
+        shutdownLatch.await();
+        assertTrue("Bookie " + shutdownBookieRef.get()
                 + "is not listed in the ledger as missing replica :" + data,
-                data.contains(shutdownBookie));
+                data.contains(shutdownBookieRef.get()));
     }
 
     @Test
@@ -494,7 +515,16 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         urLedgerMgr.setLostBookieRecoveryDelay(3);
 
         // shutdown a non auditor bookie; choosing non-auditor to avoid 
another election
-        String shutdownBookie = shutDownNonAuditorBookie();
+        AtomicReference<String> shutdownBookieRef = new AtomicReference<>();
+        CountDownLatch shutdownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                String shutdownBookie = shutDownNonAuditorBookie();
+                shutdownBookieRef.set(shutdownBookie);
+                shutdownLatch.countDown();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         LOG.debug("Waiting for ledgers to be marked as under replicated");
         assertFalse("audit of lost bookie isn't delayed", 
underReplicaLatch.await(2, TimeUnit.SECONDS));
@@ -516,9 +546,10 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
                 urLedgerList.contains(ledgerId));
         Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
         String data = urLedgerData.get(ledgerId);
-        assertTrue("Bookie " + shutdownBookie
+        shutdownLatch.await();
+        assertTrue("Bookie " + shutdownBookieRef.get()
                 + "is not listed in the ledger as missing replica :" + data,
-                data.contains(shutdownBookie));
+                data.contains(shutdownBookieRef.get()));
     }
 
     @Test
@@ -605,7 +636,12 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
 
         // shutdown a non auditor bookie; choosing non-auditor to avoid 
another election
-        String shutdownBookie = shutDownNonAuditorBookie();
+        new Thread(() -> {
+            try {
+                shutDownNonAuditorBookie();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         LOG.debug("Waiting for ledgers to be marked as under replicated");
         assertFalse("audit of lost bookie isn't delayed", 
underReplicaLatch.await(2, TimeUnit.SECONDS));
@@ -652,7 +688,12 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
 
         // shutdown a non auditor bookie; choosing non-auditor to avoid 
another election
-        String shutdownBookie = shutDownNonAuditorBookie();
+        new Thread(() -> {
+            try {
+                shutDownNonAuditorBookie();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         LOG.debug("Waiting for ledgers to be marked as under replicated");
         assertFalse("audit of lost bookie isn't delayed", 
underReplicaLatch.await(2, TimeUnit.SECONDS));
@@ -700,8 +741,17 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         // wait for 10 seconds before starting the recovery work when a bookie 
fails
         urLedgerMgr.setLostBookieRecoveryDelay(10);
 
-        // shutdown a non auditor bookie to avoid an election
-        String shutdownBookie1 = shutDownNonAuditorBookie();
+        // shutdown a non auditor bookie; choosing non-auditor to avoid 
another election
+        AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>();
+        CountDownLatch shutdownLatch1 = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                String shutdownBookie1 = shutDownNonAuditorBookie();
+                shutdownBookieRef1.set(shutdownBookie1);
+                shutdownLatch1.countDown();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         // wait for 3 seconds and there shouldn't be any under replicated 
ledgers
         // because we have delayed the start of audit by 10 seconds
@@ -713,7 +763,16 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         // the history about having delayed recovery remains. Hence we make 
sure
         // we bring down a non auditor bookie. This should cause the audit to 
take
         // place immediately and not wait for the remaining 7 seconds to elapse
-        String shutdownBookie2 = shutDownNonAuditorBookie();
+        AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>();
+        CountDownLatch shutdownLatch2 = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                String shutdownBookie2 = shutDownNonAuditorBookie();
+                shutdownBookieRef2.set(shutdownBookie2);
+                shutdownLatch2.countDown();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         // 2 second grace period for the ledgers to get reported as under 
replicated
         Thread.sleep(2000);
@@ -726,9 +785,11 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
                 urLedgerList.contains(ledgerId));
         Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
         String data = urLedgerData.get(ledgerId);
-        assertTrue("Bookie " + shutdownBookie1 + shutdownBookie2
+        shutdownLatch1.await();
+        shutdownLatch2.await();
+        assertTrue("Bookie " + shutdownBookieRef1.get() + 
shutdownBookieRef2.get()
                 + " are not listed in the ledger as missing replicas :" + data,
-                data.contains(shutdownBookie1) && 
data.contains(shutdownBookie2));
+                data.contains(shutdownBookieRef1.get()) && 
data.contains(shutdownBookieRef2.get()));
     }
 
     /**
@@ -756,7 +817,17 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         // shutdown a non auditor bookie to avoid an election
         int idx1 = getShutDownNonAuditorBookieIdx("");
         ServerConfiguration conf1 = confByIndex(idx1);
-        String shutdownBookie1 = shutdownBookie(idx1);
+
+        AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>();
+        CountDownLatch shutdownLatch1 = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                String shutdownBookie1 = shutdownBookie(idx1);
+                shutdownBookieRef1.set(shutdownBookie1);
+                shutdownLatch1.countDown();
+            } catch (Exception ignore) {
+            }
+        }).start();
 
         // wait for 2 seconds and there shouldn't be any under replicated 
ledgers
         // because we have delayed the start of audit by 5 seconds
@@ -769,8 +840,17 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
 
         // Now to simulate the rolling upgrade, bring down a bookie different 
from
         // the one we brought down/up above.
-        String shutdownBookie2 = shutDownNonAuditorBookie(shutdownBookie1);
-
+        // shutdown a non auditor bookie; choosing non-auditor to avoid 
another election
+        AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>();
+        CountDownLatch shutdownLatch2 = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                String shutdownBookie2 = shutDownNonAuditorBookie();
+                shutdownBookieRef2.set(shutdownBookie2);
+                shutdownLatch2.countDown();
+            } catch (Exception ignore) {
+            }
+        }).start();
         // since the first bookie that was brought down/up has come up, there 
is only
         // one bookie down at this time. Hence the lost bookie check shouldn't 
start
         // immediately; it will start 5 seconds after the second bookie went 
down
@@ -787,11 +867,13 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
                 urLedgerList.contains(ledgerId));
         Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
         String data = urLedgerData.get(ledgerId);
-        assertTrue("Bookie " + shutdownBookie1 + "wrongly listed as missing 
the ledger: " + data,
-                   !data.contains(shutdownBookie1));
-        assertTrue("Bookie " + shutdownBookie2
+        shutdownLatch1.await();
+        shutdownLatch2.await();
+        assertTrue("Bookie " + shutdownBookieRef1.get() + "wrongly listed as 
missing the ledger: " + data,
+                   !data.contains(shutdownBookieRef1.get()));
+        assertTrue("Bookie " + shutdownBookieRef2.get()
                    + " is not listed in the ledger as missing replicas :" + 
data,
-                   data.contains(shutdownBookie2));
+                   data.contains(shutdownBookieRef2.get()));
         LOG.info("*****************Test Complete");
     }
 
@@ -952,7 +1034,7 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         return auditorElectors.get(bookieAddr).auditor;
     }
 
-    private String  shutDownNonAuditorBookie() throws Exception {
+    private String shutDownNonAuditorBookie() throws Exception {
         // shutdown bookie which is not an auditor
         int indexOf = indexOfServer(getAuditorBookie());
         int bkIndexDownBookie;

Reply via email to