Copilot commented on code in PR #4594:
URL: https://github.com/apache/bookkeeper/pull/4594#discussion_r2127894126


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java:
##########
@@ -177,77 +180,108 @@ public void initialize(ServerConfiguration conf, 
LedgerManager ledgerManager, Le
         long readAheadCacheBatchBytesSize = 
conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
                 DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);
 
-        ledgerStorageList = Lists.newArrayList();
+        ExecutorService storageInitExecutor = Executors.newFixedThreadPool(
+                Math.min(ledgerDirsManager.getAllLedgerDirs().size(),
+                        Runtime.getRuntime().availableProcessors() * 2),
+                new DefaultThreadFactory("LedgerStorage-Initializer")
+        );
+
+        ledgerStorageList = new 
ArrayList<>(Collections.nCopies(ledgerDirsManager.getAllLedgerDirs().size(), 
null));
+        CountDownLatch downLatch = new 
CountDownLatch(ledgerDirsManager.getAllLedgerDirs().size());
+        AtomicInteger initFailed = new AtomicInteger(0);
         for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
-            File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i);
-            File indexDir = indexDirsManager.getAllLedgerDirs().get(i);
-            // Create a ledger dirs manager for the single directory
-            File[] lDirs = new File[1];
-            // Remove the `/current` suffix which will be appended again by 
LedgersDirManager
-            lDirs[0] = ledgerDir.getParentFile();
-            LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, 
ledgerDirsManager.getDiskChecker(),
-                    NullStatsLogger.INSTANCE);
-
-            // Create a index dirs manager for the single directory
-            File[] iDirs = new File[1];
-            // Remove the `/current` suffix which will be appended again by 
LedgersDirManager
-            iDirs[0] = indexDir.getParentFile();
-            LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, 
indexDirsManager.getDiskChecker(),
-                    NullStatsLogger.INSTANCE);
-
-            EntryLogger entrylogger;
-            if (directIOEntryLogger) {
-                long perDirectoryTotalWriteBufferSize = MB * 
getLongVariableOrDefault(
-                    conf,
-                    DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
-                    DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / 
numberOfDirs;
-                long perDirectoryTotalReadBufferSize = MB * 
getLongVariableOrDefault(
-                    conf,
-                    DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
-                    DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
-                int readBufferSize = MB * (int) getLongVariableOrDefault(
-                    conf,
-                    DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
-                    DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
-                int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
-                    conf,
-                    DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
-                    DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
-                Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
-                entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
-                    new DefaultThreadFactory("EntryLoggerWrite"));
-                entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
-                    new DefaultThreadFactory("EntryLoggerFlush"));
-
-                int numReadThreads = conf.getNumReadWorkerThreads();
-                if (numReadThreads == 0) {
-                    numReadThreads = conf.getServerNumIOThreads();
+            int finalI = i;
+            storageInitExecutor.execute(() -> {
+                try {
+                    File ledgerDir = 
ledgerDirsManager.getAllLedgerDirs().get(finalI);
+                    File indexDir = 
indexDirsManager.getAllLedgerDirs().get(finalI);
+                    // Create a ledger dirs manager for the single directory
+                    File[] lDirs = new File[1];
+                    // Remove the `/current` suffix which will be appended 
again by LedgersDirManager
+                    lDirs[0] = ledgerDir.getParentFile();
+                    LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, 
ledgerDirsManager.getDiskChecker(),
+                            NullStatsLogger.INSTANCE);
+
+                    // Create a index dirs manager for the single directory
+                    File[] iDirs = new File[1];
+                    // Remove the `/current` suffix which will be appended 
again by LedgersDirManager
+                    iDirs[0] = indexDir.getParentFile();
+                    LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, 
indexDirsManager.getDiskChecker(),
+                            NullStatsLogger.INSTANCE);
+
+                    EntryLogger entrylogger;
+                    if (directIOEntryLogger) {
+                        long perDirectoryTotalWriteBufferSize = MB * 
getLongVariableOrDefault(
+                                conf,
+                                
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
+                                DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / 
numberOfDirs;
+                        long perDirectoryTotalReadBufferSize = MB * 
getLongVariableOrDefault(
+                                conf,
+                                DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
+                                DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / 
numberOfDirs;
+                        int readBufferSize = MB * (int) 
getLongVariableOrDefault(
+                                conf,
+                                DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
+                                DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
+                        int maxFdCacheTimeSeconds = (int) 
getLongVariableOrDefault(
+                                conf,
+                                
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
+                                DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
+                        Slf4jSlogger slog = new 
Slf4jSlogger(DbLedgerStorage.class);
+                        entryLoggerWriteExecutor = 
Executors.newSingleThreadExecutor(
+                                new DefaultThreadFactory("EntryLoggerWrite"));
+                        entryLoggerFlushExecutor = 
Executors.newSingleThreadExecutor(
+                                new DefaultThreadFactory("EntryLoggerFlush"));
+
+                        int numReadThreads = conf.getNumReadWorkerThreads();
+                        if (numReadThreads == 0) {
+                            numReadThreads = conf.getServerNumIOThreads();
+                        }
+
+                        entrylogger = new DirectEntryLogger(ledgerDir, new 
EntryLogIdsImpl(ldm, slog),
+                                new NativeIOImpl(),
+                                allocator, entryLoggerWriteExecutor, 
entryLoggerFlushExecutor,
+                                conf.getEntryLogSizeLimit(),
+                                conf.getNettyMaxFrameSizeBytes() - 500,
+                                perDirectoryTotalWriteBufferSize,
+                                perDirectoryTotalReadBufferSize,
+                                readBufferSize,
+                                numReadThreads,
+                                maxFdCacheTimeSeconds,
+                                slog, statsLogger);
+                    } else {
+                        entrylogger = new DefaultEntryLogger(conf, ldm, null, 
statsLogger, allocator);
+                    }
+                    ledgerStorageList.set(finalI, 
newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
+                            idm, entrylogger,
+                            statsLogger, perDirectoryWriteCacheSize,
+                            perDirectoryReadCacheSize,
+                            readAheadCacheBatchSize, 
readAheadCacheBatchBytesSize));
+                    
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
+                    if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
+                        
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
+                    }
+                } catch (IOException e) {
+                    log.error("Failed to initialize DbLedgerStorage", e);
+                    initFailed.incrementAndGet();
                 }
+                downLatch.countDown();
+            });
+        }
 
-                entrylogger = new DirectEntryLogger(ledgerDir, new 
EntryLogIdsImpl(ldm, slog),
-                    new NativeIOImpl(),
-                    allocator, entryLoggerWriteExecutor, 
entryLoggerFlushExecutor,
-                    conf.getEntryLogSizeLimit(),
-                    conf.getNettyMaxFrameSizeBytes() - 500,
-                    perDirectoryTotalWriteBufferSize,
-                    perDirectoryTotalReadBufferSize,
-                    readBufferSize,
-                    numReadThreads,
-                    maxFdCacheTimeSeconds,
-                    slog, statsLogger);
-            } else {
-                entrylogger = new DefaultEntryLogger(conf, ldm, null, 
statsLogger, allocator);
-            }
-            ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, 
ledgerManager, ldm,
-                idm, entrylogger,
-                statsLogger, perDirectoryWriteCacheSize,
-                perDirectoryReadCacheSize,
-                readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
-            
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
-            if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
-                
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
-            }
+        log.info("awaiting for DbLedgerStorage initialization");
+        try {
+            downLatch.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Failed to initialize DbLedgerStorage", e);
+        } finally {
+            storageInitExecutor.shutdown();

Review Comment:
   After shutting down the executor, consider awaiting its termination to 
ensure all tasks have completed gracefully before proceeding.
   ```suggestion
               storageInitExecutor.shutdown();
               try {
                   if (!storageInitExecutor.awaitTermination(60, 
java.util.concurrent.TimeUnit.SECONDS)) {
                       log.warn("storageInitExecutor did not terminate within 
the timeout.");
                   }
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   log.error("Interrupted while waiting for storageInitExecutor 
to terminate", e);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to