This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 3f73e06bfc7319cccad09cdad8bbdf6de5676c77 Author: Yan Zhao <[email protected]> AuthorDate: Thu Jan 18 16:04:06 2024 +0800 Fix auditor elector executor block problem. (#4165) ### Motivation Now, when we shut down the auditor elector. The shutdown behavior via submitting a shutdown task to do it. In some cases, the follower auditor elector executor is always blocked due to waiting leader election, so the shutdown task will lie in the task queue forever, get no chance to execute. In the pulsar, the case happen. See https://github.com/apache/pulsar/pull/21797#discussion_r1436278822 So in the auditor elector shutdown, if the remain task can't done in time, we should invoke shutdownNow to interrupt the blocked thread. (cherry picked from commit c3748dd0bba8d8534ecb443213c287cd9b8736b7) --- .../bookkeeper/replication/AuditorElector.java | 55 +++++---- .../replication/AuditorLedgerCheckerTest.java | 130 +++++++++++++++++---- 2 files changed, 141 insertions(+), 44 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java index badd5fd26e..3c160bd487 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java @@ -24,11 +24,14 @@ import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -146,26 +149,28 @@ public class AuditorElector { /** * Run cleanup operations for the auditor elector. */ - private void submitShutdownTask() { - executor.submit(new Runnable() { - @Override - public void run() { - if (!running.compareAndSet(true, false)) { - return; - } - - try { - ledgerAuditorManager.close(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("InterruptedException while closing ledger auditor manager", ie); - } catch (Exception ke) { - LOG.error("Exception while closing ledger auditor manager", ke); - } - } - }); + private Future<?> submitShutdownTask() { + return executor.submit(shutdownTask); } + Runnable shutdownTask = new Runnable() { + @Override + public void run() { + if (!running.compareAndSet(true, false)) { + return; + } + + try { + ledgerAuditorManager.close(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("InterruptedException while closing ledger auditor manager", ie); + } catch (Exception ke) { + LOG.error("Exception while closing ledger auditor manager", ke); + } + } + }; + /** * Performing the auditor election using the ZooKeeper ephemeral sequential * znode. The bookie which has created the least sequential will be elect as @@ -235,8 +240,18 @@ public class AuditorElector { return; } // close auditor manager - submitShutdownTask(); - executor.shutdown(); + try { + submitShutdownTask().get(10, TimeUnit.SECONDS); + executor.shutdown(); + } catch (ExecutionException e) { + LOG.warn("Failed to close auditor manager", e); + executor.shutdownNow(); + shutdownTask.run(); + } catch (TimeoutException e) { + LOG.warn("Failed to close auditor manager in 10 seconds", e); + executor.shutdownNow(); + shutdownTask.run(); + } } if (auditor != null) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java index ed789320bd..27f2e8b266 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.bookkeeper.bookie.BookieImpl; @@ -381,7 +382,16 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerMgr.setLostBookieRecoveryDelay(5); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + AtomicReference<String> shutdownBookieRef = new AtomicReference<>(); + CountDownLatch shutdownLatch = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie = shutDownNonAuditorBookie(); + shutdownBookieRef.set(shutdownBookie); + shutdownLatch.countDown(); + } catch (Exception ignore) { + } + }).start(); LOG.debug("Waiting for ledgers to be marked as under replicated"); assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(4, TimeUnit.SECONDS)); @@ -395,9 +405,10 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerList.contains(ledgerId)); Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie + shutdownLatch.await(); + assertTrue("Bookie " + shutdownBookieRef.get() + "is not listed in the ledger as missing replica :" + data, - data.contains(shutdownBookie)); + data.contains(shutdownBookieRef.get())); } /** @@ -454,7 +465,16 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerMgr.setLostBookieRecoveryDelay(50); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + AtomicReference<String> shutdownBookieRef = new AtomicReference<>(); + CountDownLatch shutdownLatch = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie = shutDownNonAuditorBookie(); + shutdownBookieRef.set(shutdownBookie); + shutdownLatch.countDown(); + } catch (Exception ignore) { + } + }).start(); LOG.debug("Waiting for ledgers to be marked as under replicated"); assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(4, TimeUnit.SECONDS)); @@ -471,9 +491,10 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerList.contains(ledgerId)); Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie + shutdownLatch.await(); + assertTrue("Bookie " + shutdownBookieRef.get() + "is not listed in the ledger as missing replica :" + data, - data.contains(shutdownBookie)); + data.contains(shutdownBookieRef.get())); } @Test @@ -494,7 +515,16 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerMgr.setLostBookieRecoveryDelay(3); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + AtomicReference<String> shutdownBookieRef = new AtomicReference<>(); + CountDownLatch shutdownLatch = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie = shutDownNonAuditorBookie(); + shutdownBookieRef.set(shutdownBookie); + shutdownLatch.countDown(); + } catch (Exception ignore) { + } + }).start(); LOG.debug("Waiting for ledgers to be marked as under replicated"); assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); @@ -516,9 +546,10 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerList.contains(ledgerId)); Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie + shutdownLatch.await(); + assertTrue("Bookie " + shutdownBookieRef.get() + "is not listed in the ledger as missing replica :" + data, - data.contains(shutdownBookie)); + data.contains(shutdownBookieRef.get())); } @Test @@ -605,7 +636,12 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + new Thread(() -> { + try { + shutDownNonAuditorBookie(); + } catch (Exception ignore) { + } + }).start(); LOG.debug("Waiting for ledgers to be marked as under replicated"); assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); @@ -652,7 +688,12 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay); // shutdown a non auditor bookie; choosing non-auditor to avoid another election - String shutdownBookie = shutDownNonAuditorBookie(); + new Thread(() -> { + try { + shutDownNonAuditorBookie(); + } catch (Exception ignore) { + } + }).start(); LOG.debug("Waiting for ledgers to be marked as under replicated"); assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS)); @@ -700,8 +741,17 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { // wait for 10 seconds before starting the recovery work when a bookie fails urLedgerMgr.setLostBookieRecoveryDelay(10); - // shutdown a non auditor bookie to avoid an election - String shutdownBookie1 = shutDownNonAuditorBookie(); + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>(); + CountDownLatch shutdownLatch1 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie1 = shutDownNonAuditorBookie(); + shutdownBookieRef1.set(shutdownBookie1); + shutdownLatch1.countDown(); + } catch (Exception ignore) { + } + }).start(); // wait for 3 seconds and there shouldn't be any under replicated ledgers // because we have delayed the start of audit by 10 seconds @@ -713,7 +763,16 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { // the history about having delayed recovery remains. Hence we make sure // we bring down a non auditor bookie. This should cause the audit to take // place immediately and not wait for the remaining 7 seconds to elapse - String shutdownBookie2 = shutDownNonAuditorBookie(); + AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>(); + CountDownLatch shutdownLatch2 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie2 = shutDownNonAuditorBookie(); + shutdownBookieRef2.set(shutdownBookie2); + shutdownLatch2.countDown(); + } catch (Exception ignore) { + } + }).start(); // 2 second grace period for the ledgers to get reported as under replicated Thread.sleep(2000); @@ -726,9 +785,11 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerList.contains(ledgerId)); Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie1 + shutdownBookie2 + shutdownLatch1.await(); + shutdownLatch2.await(); + assertTrue("Bookie " + shutdownBookieRef1.get() + shutdownBookieRef2.get() + " are not listed in the ledger as missing replicas :" + data, - data.contains(shutdownBookie1) && data.contains(shutdownBookie2)); + data.contains(shutdownBookieRef1.get()) && data.contains(shutdownBookieRef2.get())); } /** @@ -756,7 +817,17 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { // shutdown a non auditor bookie to avoid an election int idx1 = getShutDownNonAuditorBookieIdx(""); ServerConfiguration conf1 = confByIndex(idx1); - String shutdownBookie1 = shutdownBookie(idx1); + + AtomicReference<String> shutdownBookieRef1 = new AtomicReference<>(); + CountDownLatch shutdownLatch1 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie1 = shutdownBookie(idx1); + shutdownBookieRef1.set(shutdownBookie1); + shutdownLatch1.countDown(); + } catch (Exception ignore) { + } + }).start(); // wait for 2 seconds and there shouldn't be any under replicated ledgers // because we have delayed the start of audit by 5 seconds @@ -769,8 +840,17 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { // Now to simulate the rolling upgrade, bring down a bookie different from // the one we brought down/up above. - String shutdownBookie2 = shutDownNonAuditorBookie(shutdownBookie1); - + // shutdown a non auditor bookie; choosing non-auditor to avoid another election + AtomicReference<String> shutdownBookieRef2 = new AtomicReference<>(); + CountDownLatch shutdownLatch2 = new CountDownLatch(1); + new Thread(() -> { + try { + String shutdownBookie2 = shutDownNonAuditorBookie(); + shutdownBookieRef2.set(shutdownBookie2); + shutdownLatch2.countDown(); + } catch (Exception ignore) { + } + }).start(); // since the first bookie that was brought down/up has come up, there is only // one bookie down at this time. Hence the lost bookie check shouldn't start // immediately; it will start 5 seconds after the second bookie went down @@ -787,11 +867,13 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { urLedgerList.contains(ledgerId)); Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList); String data = urLedgerData.get(ledgerId); - assertTrue("Bookie " + shutdownBookie1 + "wrongly listed as missing the ledger: " + data, - !data.contains(shutdownBookie1)); - assertTrue("Bookie " + shutdownBookie2 + shutdownLatch1.await(); + shutdownLatch2.await(); + assertTrue("Bookie " + shutdownBookieRef1.get() + "wrongly listed as missing the ledger: " + data, + !data.contains(shutdownBookieRef1.get())); + assertTrue("Bookie " + shutdownBookieRef2.get() + " is not listed in the ledger as missing replicas :" + data, - data.contains(shutdownBookie2)); + data.contains(shutdownBookieRef2.get())); LOG.info("*****************Test Complete"); } @@ -952,7 +1034,7 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase { return auditorElectors.get(bookieAddr).auditor; } - private String shutDownNonAuditorBookie() throws Exception { + private String shutDownNonAuditorBookie() throws Exception { // shutdown bookie which is not an auditor int indexOf = indexOfServer(getAuditorBookie()); int bkIndexDownBookie;
