This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 34bd4b38fa Upgrade mockito from 3.12.4 to 4.11.0 (#4218)
34bd4b38fa is described below
commit 34bd4b38fa13f105608ae4e7b4c749124c271e1e
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Feb 21 23:39:28 2024 +0800
Upgrade mockito from 3.12.4 to 4.11.0 (#4218)
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 26 ++++++++++++---
.../java/org/apache/bookkeeper/bookie/Journal.java | 38 +++++++++++++++-------
.../bookie/BookieMultipleJournalsTest.java | 4 +--
pom.xml | 2 +-
4 files changed, 50 insertions(+), 20 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 654ace4547..a660a13ce8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory;
/**
* Implements a bookie.
*/
-public class BookieImpl extends BookieCriticalThread implements Bookie {
+public class BookieImpl implements Bookie {
private static final Logger LOG = LoggerFactory.getLogger(Bookie.class);
@@ -119,6 +119,8 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
protected StateManager stateManager;
+ private BookieCriticalThread bookieThread;
+
// Expose Stats
final StatsLogger statsLogger;
private final BookieStats bookieStats;
@@ -390,7 +392,6 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
ByteBufAllocator allocator,
Supplier<BookieServiceInfo> bookieServiceInfoProvider)
throws IOException, InterruptedException, BookieException {
- super("Bookie-" + conf.getBookiePort());
this.bookieServiceInfoProvider = bookieServiceInfoProvider;
this.statsLogger = statsLogger;
this.conf = conf;
@@ -656,7 +657,9 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
@Override
public synchronized void start() {
- setDaemon(true);
+ bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" +
conf.getBookiePort());
+ bookieThread.setDaemon(true);
+
ThreadRegistry.register("BookieThread", 0);
if (LOG.isDebugEnabled()) {
LOG.debug("I'm starting a bookie with journal directories {}",
@@ -717,7 +720,7 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
syncThread.start();
// start bookie thread
- super.start();
+ bookieThread.start();
// After successful bookie startup, register listener for disk
// error/full notifications.
@@ -741,6 +744,20 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
}
}
+ @Override
+ public void join() throws InterruptedException {
+ if (bookieThread != null) {
+ bookieThread.join();
+ }
+ }
+
+ public boolean isAlive() {
+ if (bookieThread == null) {
+ return false;
+ }
+ return bookieThread.isAlive();
+ }
+
/*
* Get the DiskFailure listener for the bookie
*/
@@ -824,7 +841,6 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
return stateManager.isRunning();
}
- @Override
public void run() {
// start journals
for (Journal journal: journals) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 3b730cb476..df10dfe522 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
@@ -66,13 +67,15 @@ import org.slf4j.LoggerFactory;
/**
* Provide journal related management.
*/
-public class Journal extends BookieCriticalThread implements CheckpointSource {
+public class Journal implements CheckpointSource {
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
private static final RecyclableArrayList.Recycler<QueueEntry>
entryListRecycler =
new RecyclableArrayList.Recycler<QueueEntry>();
+ private BookieCriticalThread thread;
+
/**
* Filter to pickup journals.
*/
@@ -461,13 +464,13 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
volatile boolean running = true;
// This holds the queue entries that should be notified after a
// successful force write
- Thread threadToNotifyOnEx;
+ Consumer<Void> threadToNotifyOnEx;
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
- public ForceWriteThread(Thread threadToNotifyOnEx,
+ public ForceWriteThread(Consumer<Void> threadToNotifyOnEx,
boolean enableGroupForceWrites,
StatsLogger statsLogger) {
super("ForceWriteThread");
@@ -531,7 +534,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
// Regardless of what caused us to exit, we should notify the
// the parent thread as it should either exit or be in the process
// of exiting else we will have write requests hang
- threadToNotifyOnEx.interrupt();
+ threadToNotifyOnEx.accept(null);
}
private void syncJournal(ForceWriteRequest lastRequest) throws
IOException {
@@ -652,8 +655,6 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
public Journal(int journalIndex, File journalDirectory,
ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger,
ByteBufAllocator allocator) {
- super(journalThreadName + "-" + conf.getBookiePort());
- this.setPriority(Thread.MAX_PRIORITY);
this.allocator = allocator;
StatsLogger journalStatsLogger =
statsLogger.scopeLabel("journalIndex", String.valueOf(journalIndex));
@@ -678,8 +679,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
this.syncData = conf.getJournalSyncData();
this.maxBackupJournals = conf.getMaxBackupJournals();
- this.forceWriteThread = new ForceWriteThread(this,
conf.getJournalAdaptiveGroupWrites(),
- journalStatsLogger);
+ this.forceWriteThread = new ForceWriteThread((__) ->
this.interruptThread(),
+ conf.getJournalAdaptiveGroupWrites(), journalStatsLogger);
this.maxGroupWaitInNanos =
TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
this.bufferedWritesThreshold =
conf.getJournalBufferedWritesThreshold();
this.bufferedEntriesThreshold =
conf.getJournalBufferedEntriesThreshold();
@@ -956,7 +957,6 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
* </p>
* @see org.apache.bookkeeper.bookie.SyncThread
*/
- @Override
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
ThreadRegistry.register(journalThreadName, 0);
@@ -1255,8 +1255,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
forceWriteThread.shutdown();
running = false;
- this.interrupt();
- this.join();
+ this.interruptThread();
+ this.joinThread();
LOG.info("Finished Shutting down Journal thread");
} catch (IOException | InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -1284,7 +1284,21 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
*/
@VisibleForTesting
public void joinThread() throws InterruptedException {
- join();
+ if (thread != null) {
+ thread.join();
+ }
+ }
+
+ public void interruptThread() {
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+
+ public synchronized void start() {
+ thread = new BookieCriticalThread(() -> run(), journalThreadName + "-"
+ conf.getBookiePort());
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.start();
}
long getMemoryUsage() {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
index b3622b3bb6..664e31541b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java
@@ -74,7 +74,7 @@ public class BookieMultipleJournalsTest extends
BookKeeperClusterTestCase {
Field journalList = bookie.getClass().getDeclaredField("journals");
journalList.setAccessible(true);
List<Journal> journals = (List<Journal>) journalList.get(bookie);
- journals.get(0).interrupt();
+ journals.get(0).interruptThread();
Awaitility.await().untilAsserted(() ->
assertFalse(bookie.isRunning()));
}
@@ -92,7 +92,7 @@ public class BookieMultipleJournalsTest extends
BookKeeperClusterTestCase {
Field journalList = bookie.getClass().getDeclaredField("journals");
journalList.setAccessible(true);
List<Journal> journals = (List<Journal>) journalList.get(bookie);
- journals.get(0).interrupt();
+ journals.get(0).interruptThread();
bookie.shutdown(ExitCode.OK);
Awaitility.await().untilAsserted(() ->
assertFalse(bookie.isRunning()));
}
diff --git a/pom.xml b/pom.xml
index 69d1bbe56b..215700bce0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@
<lombok.version>1.18.30</lombok.version>
<log4j.version>2.18.0</log4j.version>
<lz4.version>1.3.0</lz4.version>
- <mockito.version>3.12.4</mockito.version>
+ <mockito.version>4.11.0</mockito.version>
<netty.version>4.1.104.Final</netty.version>
<netty-iouring.version>0.0.24.Final</netty-iouring.version>
<ostrich.version>9.1.3</ostrich.version>