This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1d82b9278c Fix ThreadRegistry#register behavior to ensure correct
Prom metrics (#4300)
1d82b9278c is described below
commit 1d82b9278cc8916ca4410bd38f8fea50bfc41aaa
Author: Michael Marshall <[email protected]>
AuthorDate: Wed May 22 04:23:51 2024 -0500
Fix ThreadRegistry#register behavior to ensure correct Prom metrics
(#4300)
* Make tests fail
* Fix ThreadRegistry#register to ensure correct Prom metrics
* Change style to match BK standards
* Fix tests
---------
Co-authored-by: Nicolò Boschi <[email protected]>
---
.../testing/executors/MockExecutorController.java | 5 ++
.../org/apache/bookkeeper/bookie/BookieImpl.java | 2 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 4 +-
.../org/apache/bookkeeper/bookie/SyncThread.java | 8 ++-
.../ldb/SingleDirectoryDbLedgerStorage.java | 8 ++-
.../apache/bookkeeper/proto/BookieNettyServer.java | 8 ++-
.../bookie/LedgerStorageCheckpointTest.java | 3 +
.../bookkeeper/test/BookKeeperClusterTestCase.java | 6 ++
.../apache/bookkeeper/stats/ThreadRegistry.java | 69 +++++++++++++++++++++-
.../prometheus/DataSketchesOpStatsLogger.java | 5 ++
.../ThreadScopedDataSketchesStatsLogger.java | 26 +++++++-
11 files changed, 133 insertions(+), 11 deletions(-)
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
index 4942e348ba..7299d1d931 100644
---
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/executors/MockExecutorController.java
@@ -42,6 +42,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stats.ThreadRegistry;
import org.mockito.stubbing.Answer;
/**
@@ -172,6 +173,10 @@ public class MockExecutorController {
private static Answer<Future<?>> answerNow() {
return invocationOnMock -> {
+ // this method executes everything in the caller thread
+ // this messes up assertions that verify
+ // that a thread is part of only a threadpool
+
ThreadRegistry.forceClearRegistrationForTests(Thread.currentThread().getId());
Runnable task = invocationOnMock.getArgument(0);
task.run();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 1514a74ad8..a69df4a176 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -660,7 +660,7 @@ public class BookieImpl implements Bookie {
bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" +
conf.getBookiePort());
bookieThread.setDaemon(true);
- ThreadRegistry.register("BookieThread", 0);
+ ThreadRegistry.register("BookieThread", true);
if (LOG.isDebugEnabled()) {
LOG.debug("I'm starting a bookie with journal directories {}",
journalDirectories.stream().map(File::getName).collect(Collectors.joining(",
")));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index de22bf416a..2d68b8d2da 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -482,7 +482,7 @@ public class Journal implements CheckpointSource {
@Override
public void run() {
LOG.info("ForceWrite Thread started");
- ThreadRegistry.register(super.getName(), 0);
+ ThreadRegistry.register(super.getName());
if (conf.isBusyWaitEnabled()) {
try {
@@ -955,7 +955,7 @@ public class Journal implements CheckpointSource {
*/
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
- ThreadRegistry.register(journalThreadName, 0);
+ ThreadRegistry.register(journalThreadName);
if (conf.isBusyWaitEnabled()) {
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index c3a4c8fc71..3b77cf45f1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -82,12 +82,16 @@ class SyncThread implements Checkpointer {
this.checkpointSource = checkpointSource;
this.executor = newExecutor();
this.syncExecutorTime =
statsLogger.getThreadScopedCounter("sync-thread-time");
- this.executor.submit(() -> ThreadRegistry.register(executorName, 0));
}
@VisibleForTesting
static ScheduledExecutorService newExecutor() {
- return Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory(executorName));
+ return Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory(executorName) {
+ @Override
+ protected Thread newThread(Runnable r, String name) {
+ return super.newThread(ThreadRegistry.registerThread(r,
executorName), name);
+ }
+ });
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 405ffe6f11..35d4c8caf2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -117,7 +117,12 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
private static String dbStoragerExecutorName = "db-storage";
private final ExecutorService executor = Executors.newSingleThreadExecutor(
- new DefaultThreadFactory(dbStoragerExecutorName));
+ new DefaultThreadFactory(dbStoragerExecutorName) {
+ @Override
+ protected Thread newThread(Runnable r, String name) {
+ return super.newThread(ThreadRegistry.registerThread(r,
dbStoragerExecutorName), name);
+ }
+ });
// Executor used to for db index cleanup
private final ScheduledExecutorService cleanupExecutor = Executors
@@ -218,7 +223,6 @@ public class SingleDirectoryDbLedgerStorage implements
CompactableLedgerStorage
flushExecutorTime =
ledgerIndexDirStatsLogger.getThreadScopedCounter("db-storage-thread-time");
executor.submit(() -> {
- ThreadRegistry.register(dbStoragerExecutorName, 0);
// ensure the metric gets registered on start-up as this thread
only executes
// when the write cache is full which may not happen or not for a
long time
flushExecutorTime.addLatency(0, TimeUnit.NANOSECONDS);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index a5c4b16293..bfad643d1e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -79,6 +79,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.apache.zookeeper.KeeperException;
@@ -122,7 +123,12 @@ class BookieNettyServer {
if (!conf.isDisableServerSocketBind()) {
this.eventLoopGroup = EventLoopUtil.getServerEventLoopGroup(conf,
- new DefaultThreadFactory("bookie-io"));
+ new DefaultThreadFactory("bookie-io") {
+ @Override
+ protected Thread newThread(Runnable r, String name) {
+ return
super.newThread(ThreadRegistry.registerThread(r, "bookie-id"), name);
+ }
+ });
this.acceptorGroup = EventLoopUtil.getServerAcceptorGroup(conf,
new DefaultThreadFactory("bookie-acceptor"));
allChannels = new CleanupChannelGroup(eventLoopGroup);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index c468d2c2dc..510b7ab975 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -56,6 +56,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.PortManager;
@@ -97,6 +98,7 @@ public class LedgerStorageCheckpointTest {
@Before
public void setUp() throws Exception {
+ ThreadRegistry.clear();
LOG.info("Setting up test {}", getClass());
try {
@@ -128,6 +130,7 @@ public class LedgerStorageCheckpointTest {
@After
public void tearDown() throws Exception {
+ ThreadRegistry.clear();
LOG.info("TearDown");
sortedLedgerStorageMockedStatic.close();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 35c1ea2aeb..83893922d6 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -76,6 +76,7 @@ import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.server.Main;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.ThreadRegistry;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.PortManager;
import org.apache.zookeeper.KeeperException;
@@ -242,6 +243,11 @@ public abstract class BookKeeperClusterTestCase {
}
}
+ @After
+ public void clearMetricsThreadRegistry() throws Exception {
+ ThreadRegistry.clear();
+ }
+
/**
* Start zookeeper cluster.
*
diff --git
a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java
b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java
index e890660ebb..5bc7259a09 100644
---
a/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java
+++
b/stats/bookkeeper-stats-api/src/main/java/org/apache/bookkeeper/stats/ThreadRegistry.java
@@ -18,6 +18,9 @@ package org.apache.bookkeeper.stats;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* For mapping thread ids to thread pools and threads within those pools
* or just for lone named threads. Thread scoped metrics add labels to
@@ -25,7 +28,42 @@ import java.util.concurrent.ConcurrentMap;
* For flexibility, this registry is not based on TLS.
*/
public class ThreadRegistry {
+ private static Logger logger =
LoggerFactory.getLogger(ThreadRegistry.class);
private static ConcurrentMap<Long, ThreadPoolThread> threadPoolMap = new
ConcurrentHashMap<>();
+ private static ConcurrentMap<String, Integer> threadPoolThreadMap = new
ConcurrentHashMap<>();
+
+ /*
+ Threads can register themselves as their first act before carrying out
+ any work. By calling this method, the ThreadPoolThread is incremented
+ for the given thread pool.
+ */
+ public static void register(String threadPool) {
+ register(threadPool, false);
+ }
+
+ public static void register(String threadPool, boolean force) {
+ Integer threadPoolThread = threadPoolThreadMap.compute(threadPool, (k,
v) -> v == null ? 0 : v + 1);
+ if (force) {
+ threadPoolMap.remove(Thread.currentThread().getId());
+ }
+ register(threadPool, threadPoolThread, Thread.currentThread().getId());
+ }
+
+ /**
+ * In some tests we run in the same thread activities that should
+ * run in different threads from different thread-pools
+ * this would trigger assertions to fail.
+ * This is a convenience method to work around such cases.
+ * This method shouldn't be used in production code.
+ */
+ public static void forceClearRegistrationForTests(long threadId) {
+ threadPoolMap.compute(threadId, (id, value) -> {
+ if (value != null) {
+ logger.info("Forcibly clearing registry entry {} for thread id
{}", value, id);
+ }
+ return null;
+ });
+ }
/*
Threads can register themselves as their first act before carrying out
@@ -37,10 +75,22 @@ public class ThreadRegistry {
/*
Thread factories can register a thread by its id.
+ The assumption is that one thread belongs only to one threadpool.
+ The doesn't hold in tests, in which we use mock Executors that
+ run the code in the same thread as the caller
*/
public static void register(String threadPool, int threadPoolThread, long
threadId) {
ThreadPoolThread tpt = new ThreadPoolThread(threadPool,
threadPoolThread, threadId);
- threadPoolMap.put(threadId, tpt);
+ ThreadPoolThread previous = threadPoolMap.put(threadId, tpt);
+ if (previous != null) {
+ throw new IllegalStateException("Thread " + threadId + " was
already registered in thread pool "
+ + previous.threadPool + " as thread " + previous.ordinal +
" with threadId " + previous.threadId
+ + " trying to overwrite with " + threadPool + " and
ordinal " + threadPoolThread);
+ }
+ }
+
+ public static Runnable registerThread(Runnable runnable, String
threadPool) {
+ return new RegisteredRunnable(threadPool, runnable);
}
/*
@@ -48,6 +98,7 @@ public class ThreadRegistry {
*/
public static void clear() {
threadPoolMap.clear();
+ threadPoolThreadMap.clear();
}
/*
@@ -79,4 +130,20 @@ public class ThreadRegistry {
return ordinal;
}
}
+
+ private static class RegisteredRunnable implements Runnable {
+ private final String threadPool;
+ private final Runnable runnable;
+
+ public RegisteredRunnable(String threadPool, Runnable runnable) {
+ this.threadPool = threadPool;
+ this.runnable = runnable;
+ }
+
+ @Override
+ public void run() {
+ register(threadPool);
+ runnable.run();
+ }
+ }
}
diff --git
a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java
b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java
index 015c3d3b24..057fed65c1 100644
---
a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java
+++
b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/DataSketchesOpStatsLogger.java
@@ -212,4 +212,9 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
}
};
}
+
+ @Override
+ public String toString() {
+ return "DataSketchesOpStatsLogger{labels=" + labels + ", id=" +
System.identityHashCode(this) + "}";
+ }
}
diff --git
a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java
b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java
index 53aca52792..cee895b0c5 100644
---
a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java
+++
b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/ThreadScopedDataSketchesStatsLogger.java
@@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.OpStatsData;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* OpStatsLogger implementation that lazily registers OpStatsLoggers per thread
@@ -29,6 +31,8 @@ import org.apache.bookkeeper.stats.ThreadRegistry;
*/
public class ThreadScopedDataSketchesStatsLogger implements OpStatsLogger {
+ private static Logger logger =
LoggerFactory.getLogger(ThreadScopedDataSketchesStatsLogger.class);
+
private ThreadLocal<DataSketchesOpStatsLogger> statsLoggers;
private DataSketchesOpStatsLogger defaultStatsLogger;
private Map<String, String> originalLabels;
@@ -95,8 +99,18 @@ public class ThreadScopedDataSketchesStatsLogger implements
OpStatsLogger {
if (!statsLogger.isThreadInitialized()) {
ThreadRegistry.ThreadPoolThread tpt = ThreadRegistry.get();
if (tpt == null) {
+ logger.warn("Thread {} was not registered in the thread
registry. Using default stats logger {}.",
+ Thread.currentThread(), defaultStatsLogger);
statsLoggers.set(defaultStatsLogger);
- provider.opStats.put(new ScopeContext(scopeContext.getScope(),
originalLabels), defaultStatsLogger);
+ DataSketchesOpStatsLogger previous = provider.opStats
+ .put(new ScopeContext(scopeContext.getScope(),
originalLabels), defaultStatsLogger);
+ // If we overwrite a logger, metrics will not be collected
correctly
+ if (previous != null && previous != defaultStatsLogger) {
+ logger.error("Invalid state for thead " +
Thread.currentThread() + ". Overwrote a stats logger."
+ + "New is {}, previous was {}",
+ defaultStatsLogger, previous);
+ throw new IllegalStateException("Invalid state. Overwrote
a stats logger.");
+ }
return defaultStatsLogger;
} else {
Map<String, String> threadScopedlabels = new
HashMap<>(originalLabels);
@@ -104,7 +118,15 @@ public class ThreadScopedDataSketchesStatsLogger
implements OpStatsLogger {
threadScopedlabels.put("thread",
String.valueOf(tpt.getOrdinal()));
statsLogger.initializeThread(threadScopedlabels);
- provider.opStats.put(new ScopeContext(scopeContext.getScope(),
threadScopedlabels), statsLogger);
+ DataSketchesOpStatsLogger previous = provider.opStats
+ .put(new ScopeContext(scopeContext.getScope(),
threadScopedlabels), statsLogger);
+ // If we overwrite a logger, metrics will not be collected
correctly
+ if (previous != null && previous != statsLogger) {
+ logger.error("Invalid state for thead " +
Thread.currentThread() + ". Overwrote a stats logger."
+ + "New is {}, previous was {}",
+ defaultStatsLogger, previous);
+ throw new IllegalStateException("Invalid state. Overwrote
a stats logger.");
+ }
}
}