Copilot commented on code in PR #4594:
URL: https://github.com/apache/bookkeeper/pull/4594#discussion_r2127894126
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java:
##########
@@ -177,77 +180,108 @@ public void initialize(ServerConfiguration conf,
LedgerManager ledgerManager, Le
long readAheadCacheBatchBytesSize =
conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);
- ledgerStorageList = Lists.newArrayList();
+ ExecutorService storageInitExecutor = Executors.newFixedThreadPool(
+ Math.min(ledgerDirsManager.getAllLedgerDirs().size(),
+ Runtime.getRuntime().availableProcessors() * 2),
+ new DefaultThreadFactory("LedgerStorage-Initializer")
+ );
+
+ ledgerStorageList = new
ArrayList<>(Collections.nCopies(ledgerDirsManager.getAllLedgerDirs().size(),
null));
+ CountDownLatch downLatch = new
CountDownLatch(ledgerDirsManager.getAllLedgerDirs().size());
+ AtomicInteger initFailed = new AtomicInteger(0);
for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
- File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i);
- File indexDir = indexDirsManager.getAllLedgerDirs().get(i);
- // Create a ledger dirs manager for the single directory
- File[] lDirs = new File[1];
- // Remove the `/current` suffix which will be appended again by
LedgersDirManager
- lDirs[0] = ledgerDir.getParentFile();
- LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs,
ledgerDirsManager.getDiskChecker(),
- NullStatsLogger.INSTANCE);
-
- // Create a index dirs manager for the single directory
- File[] iDirs = new File[1];
- // Remove the `/current` suffix which will be appended again by
LedgersDirManager
- iDirs[0] = indexDir.getParentFile();
- LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs,
indexDirsManager.getDiskChecker(),
- NullStatsLogger.INSTANCE);
-
- EntryLogger entrylogger;
- if (directIOEntryLogger) {
- long perDirectoryTotalWriteBufferSize = MB *
getLongVariableOrDefault(
- conf,
- DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
- DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) /
numberOfDirs;
- long perDirectoryTotalReadBufferSize = MB *
getLongVariableOrDefault(
- conf,
- DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
- DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
- int readBufferSize = MB * (int) getLongVariableOrDefault(
- conf,
- DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
- DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
- int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
- conf,
- DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
- DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
- Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
- entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
- new DefaultThreadFactory("EntryLoggerWrite"));
- entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
- new DefaultThreadFactory("EntryLoggerFlush"));
-
- int numReadThreads = conf.getNumReadWorkerThreads();
- if (numReadThreads == 0) {
- numReadThreads = conf.getServerNumIOThreads();
+ int finalI = i;
+ storageInitExecutor.execute(() -> {
+ try {
+ File ledgerDir =
ledgerDirsManager.getAllLedgerDirs().get(finalI);
+ File indexDir =
indexDirsManager.getAllLedgerDirs().get(finalI);
+ // Create a ledger dirs manager for the single directory
+ File[] lDirs = new File[1];
+ // Remove the `/current` suffix which will be appended
again by LedgersDirManager
+ lDirs[0] = ledgerDir.getParentFile();
+ LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs,
ledgerDirsManager.getDiskChecker(),
+ NullStatsLogger.INSTANCE);
+
+ // Create a index dirs manager for the single directory
+ File[] iDirs = new File[1];
+ // Remove the `/current` suffix which will be appended
again by LedgersDirManager
+ iDirs[0] = indexDir.getParentFile();
+ LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs,
indexDirsManager.getDiskChecker(),
+ NullStatsLogger.INSTANCE);
+
+ EntryLogger entrylogger;
+ if (directIOEntryLogger) {
+ long perDirectoryTotalWriteBufferSize = MB *
getLongVariableOrDefault(
+ conf,
+
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
+ DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) /
numberOfDirs;
+ long perDirectoryTotalReadBufferSize = MB *
getLongVariableOrDefault(
+ conf,
+ DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
+ DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) /
numberOfDirs;
+ int readBufferSize = MB * (int)
getLongVariableOrDefault(
+ conf,
+ DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
+ DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
+ int maxFdCacheTimeSeconds = (int)
getLongVariableOrDefault(
+ conf,
+
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
+ DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
+ Slf4jSlogger slog = new
Slf4jSlogger(DbLedgerStorage.class);
+ entryLoggerWriteExecutor =
Executors.newSingleThreadExecutor(
+ new DefaultThreadFactory("EntryLoggerWrite"));
+ entryLoggerFlushExecutor =
Executors.newSingleThreadExecutor(
+ new DefaultThreadFactory("EntryLoggerFlush"));
+
+ int numReadThreads = conf.getNumReadWorkerThreads();
+ if (numReadThreads == 0) {
+ numReadThreads = conf.getServerNumIOThreads();
+ }
+
+ entrylogger = new DirectEntryLogger(ledgerDir, new
EntryLogIdsImpl(ldm, slog),
+ new NativeIOImpl(),
+ allocator, entryLoggerWriteExecutor,
entryLoggerFlushExecutor,
+ conf.getEntryLogSizeLimit(),
+ conf.getNettyMaxFrameSizeBytes() - 500,
+ perDirectoryTotalWriteBufferSize,
+ perDirectoryTotalReadBufferSize,
+ readBufferSize,
+ numReadThreads,
+ maxFdCacheTimeSeconds,
+ slog, statsLogger);
+ } else {
+ entrylogger = new DefaultEntryLogger(conf, ldm, null,
statsLogger, allocator);
+ }
+ ledgerStorageList.set(finalI,
newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
+ idm, entrylogger,
+ statsLogger, perDirectoryWriteCacheSize,
+ perDirectoryReadCacheSize,
+ readAheadCacheBatchSize,
readAheadCacheBatchBytesSize));
+
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
+ if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
+
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
+ }
+ } catch (IOException e) {
+ log.error("Failed to initialize DbLedgerStorage", e);
+ initFailed.incrementAndGet();
}
+ downLatch.countDown();
+ });
+ }
- entrylogger = new DirectEntryLogger(ledgerDir, new
EntryLogIdsImpl(ldm, slog),
- new NativeIOImpl(),
- allocator, entryLoggerWriteExecutor,
entryLoggerFlushExecutor,
- conf.getEntryLogSizeLimit(),
- conf.getNettyMaxFrameSizeBytes() - 500,
- perDirectoryTotalWriteBufferSize,
- perDirectoryTotalReadBufferSize,
- readBufferSize,
- numReadThreads,
- maxFdCacheTimeSeconds,
- slog, statsLogger);
- } else {
- entrylogger = new DefaultEntryLogger(conf, ldm, null,
statsLogger, allocator);
- }
- ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf,
ledgerManager, ldm,
- idm, entrylogger,
- statsLogger, perDirectoryWriteCacheSize,
- perDirectoryReadCacheSize,
- readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
-
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
- if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
-
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
- }
+ log.info("awaiting for DbLedgerStorage initialization");
+ try {
+ downLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Failed to initialize DbLedgerStorage", e);
+ } finally {
+ storageInitExecutor.shutdown();
Review Comment:
After shutting down the executor, consider awaiting its termination to
ensure all tasks have completed gracefully before proceeding.
```suggestion
storageInitExecutor.shutdown();
try {
if (!storageInitExecutor.awaitTermination(60,
java.util.concurrent.TimeUnit.SECONDS)) {
log.warn("storageInitExecutor did not terminate within
the timeout.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting for storageInitExecutor
to terminate", e);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]