This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit aaf496d006fc70a527dfe47b4c7048de4d327580 Author: AloysZhang <[email protected]> AuthorDate: Sun Jul 31 11:30:45 2022 +0800 issue #2879 : let bookie quit if journal thread exit (#2887) Descriptions of the changes in this PR: fix #2879 This pull request let bookie quit when there's journal thread exit ### Motivation As described in #2879, now if a bookie has multi journal directories means it has multi journal thread. Once a journal thread exits, the bookie will be unhealthy due to the block of all bookie-io threads, and then the bookie will not work but progress is still alive. This pull request tries to fix this problem. ### Changes check the journal thread alive in a fixed interval, let bookie quit once there's a journal thread exit (cherry picked from commit 67208fb74181faa640e793cd5757712fd9b5d9d5) --- .../org/apache/bookkeeper/bookie/BookieImpl.java | 14 +++++--- .../java/org/apache/bookkeeper/bookie/Journal.java | 12 +++++++ .../bookkeeper/bookie/JournalAliveListener.java | 28 +++++++++++++++ .../bookie/BookieMultipleJournalsTest.java | 41 ++++++++++++++++++++++ .../bookkeeper/bookie/datainteg/WriteSetsTest.java | 1 + 5 files changed, 91 insertions(+), 5 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 9bc084ee92..dd5eeb49b7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -50,6 +50,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException; @@ -425,11 +426,13 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { } } + JournalAliveListener journalAliveListener = + () -> BookieImpl.this.triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION); // instantiate the journals journals = Lists.newArrayList(); for (int i = 0; i < journalDirectories.size(); i++) { journals.add(new Journal(i, journalDirectories.get(i), - conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator)); + conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator, journalAliveListener)); } this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled(); @@ -828,12 +831,13 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { public int shutdown() { return shutdown(ExitCode.OK); } - // internal shutdown method to let shutdown bookie gracefully // when encountering exception - synchronized int shutdown(int exitCode) { + ReentrantLock lock = new ReentrantLock(true); + int shutdown(int exitCode) { + lock.lock(); try { - if (isRunning()) { // avoid shutdown twice + if (isRunning()) { // the exitCode only set when first shutdown usually due to exception found LOG.info("Shutting down Bookie-{} with exitCode {}", conf.getBookiePort(), exitCode); @@ -854,7 +858,6 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { for (Journal journal : journals) { journal.shutdown(); } - this.join(); // Shutdown the EntryLogger which has the GarbageCollector Thread running ledgerStorage.shutdown(); @@ -871,6 +874,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { LOG.error("Got Exception while trying to shutdown Bookie", e); throw e; } finally { + lock.unlock(); // setting running to false here, so watch thread // in bookie server know it only after bookie shut down stateManager.close(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index a4c91e9483..193b557312 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -687,6 +687,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { // Expose Stats private final JournalStats journalStats; + private JournalAliveListener journalAliveListener; + public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) { this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE, @@ -767,6 +769,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { () -> memoryLimitController.currentUsage()); } + public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf, + LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, + ByteBufAllocator allocator, JournalAliveListener journalAliveListener) { + this(journalIndex, journalDirectory, conf, ledgerDirsManager, statsLogger, allocator); + this.journalAliveListener = journalAliveListener; + } + JournalStats getJournalStats() { return this.journalStats; } @@ -1227,6 +1236,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { // close will flush the file system cache making any previous // cached writes durable so this is fine as well. IOUtils.close(LOG, bc); + if (journalAliveListener != null) { + journalAliveListener.onJournalExit(); + } } LOG.info("Journal exited loop!"); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java new file mode 100644 index 0000000000..ef73edc0ea --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalAliveListener.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +/** + * Listener for journal alive. + * */ +public interface JournalAliveListener { + void onJournalExit(); +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java index bc30246637..a6a9a67e70 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java @@ -21,8 +21,10 @@ package org.apache.bookkeeper.bookie; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import java.io.File; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -31,7 +33,9 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.awaitility.Awaitility; import org.junit.Test; /** @@ -57,6 +61,43 @@ public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase { return conf; } + @Test + @SuppressWarnings("unchecked") + public void testJournalExit() throws Exception { + + LedgerHandle ledgerHandle = bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]); + for (int i = 0; i < 10; i++) { + ledgerHandle.addEntry(("entry-" + i).getBytes()); + } + + BookieServer bookieServer = serverByIndex(0); + BookieImpl bookie = (BookieImpl) bookieServer.getBookie(); + Field journalList = bookie.getClass().getDeclaredField("journals"); + journalList.setAccessible(true); + List<Journal> journals = (List<Journal>) journalList.get(bookie); + journals.get(0).interrupt(); + Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning())); + } + + @Test + @SuppressWarnings("unchecked") + public void testJournalExitAndShutdown() throws Exception { + + LedgerHandle ledgerHandle = bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]); + for (int i = 0; i < 10; i++) { + ledgerHandle.addEntry(("entry-" + i).getBytes()); + } + + BookieServer bookieServer = serverByIndex(0); + BookieImpl bookie = (BookieImpl) bookieServer.getBookie(); + Field journalList = bookie.getClass().getDeclaredField("journals"); + journalList.setAccessible(true); + List<Journal> journals = (List<Journal>) journalList.get(bookie); + journals.get(0).interrupt(); + bookie.shutdown(ExitCode.OK); + Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning())); + } + @Test public void testMultipleWritesAndBookieRestart() throws Exception { // Creates few ledgers so that writes are spread across all journals diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java index 1a82b0bde0..139351b950 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/datainteg/WriteSetsTest.java @@ -158,6 +158,7 @@ public class WriteSetsTest { } } + @SuppressWarnings("deprecation") private static void assertContentsMatch(ImmutableList<Integer> writeSet, DistributionSchedule.WriteSet distWriteSet) throws Exception {
