kfaraz commented on code in PR #18489:
URL: https://github.com/apache/druid/pull/18489#discussion_r2629424653


##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws 
IOException
       }
     }
 
-    final List<DataSegment> cachedSegments = new ArrayList<>();
+    final ConcurrentLinkedQueue<DataSegment> cachedSegments = new 
ConcurrentLinkedQueue<>();
     final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+    CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+    boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec 
== null;
+    ExecutorService executorService = 
createdNewExecutorServiceToLoadSegmentCache
+                                      ? 
MoreExecutors.newDirectExecutorService()
+                                      : loadOnBootstrapExec;
 
     AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
 
-    for (int i = 0; i < segmentsToLoad.length; i++) {
-      final File file = segmentsToLoad[i];
-      log.info("Loading segment cache file [%d/%d][%s].", i + 1, 
segmentsToLoad.length, file);
-      try {
-        addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
-      }
-      catch (Exception e) {
-        log.makeAlert(e, "Failed to load segment from segment cache file.")
-           .addData("file", file)
-           .emit();
-      }
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Retrieving [%d] cached segment metadata files to cache.", 
segmentsToLoad.length);
+
+    for (File file : segmentsToLoad) {
+      executorService.submit(() -> {
+        try {
+          addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
+        }
+        catch (Exception e) {
+          log.makeAlert(e, "Failed to load segment from segment cache file.")
+             .addData("file", file)
+             .emit();
+        }
+        finally {
+          latch.countDown();
+        }
+      });
+    }
+
+    try {
+      latch.await();
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      log.error(e, "Interrupted when trying to retrieve cached segment 
metadata files");

Review Comment:
   Since it was interrupted, I think we can skip logging the stack trace.
   ```suggestion
         log.noStackTrace().error(e, "Interrupted when trying to retrieve 
cached segment metadata files");
   ```



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws 
IOException
       }
     }
 
-    final List<DataSegment> cachedSegments = new ArrayList<>();
+    final ConcurrentLinkedQueue<DataSegment> cachedSegments = new 
ConcurrentLinkedQueue<>();
     final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+    CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+    boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec 
== null;
+    ExecutorService executorService = 
createdNewExecutorServiceToLoadSegmentCache

Review Comment:
   Nit: Making these variables `final` would be nice.



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws 
IOException
       }
     }
 
-    final List<DataSegment> cachedSegments = new ArrayList<>();
+    final ConcurrentLinkedQueue<DataSegment> cachedSegments = new 
ConcurrentLinkedQueue<>();
     final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+    CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+    boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec 
== null;
+    ExecutorService executorService = 
createdNewExecutorServiceToLoadSegmentCache
+                                      ? 
MoreExecutors.newDirectExecutorService()
+                                      : loadOnBootstrapExec;
 
     AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
 
-    for (int i = 0; i < segmentsToLoad.length; i++) {
-      final File file = segmentsToLoad[i];
-      log.info("Loading segment cache file [%d/%d][%s].", i + 1, 
segmentsToLoad.length, file);
-      try {
-        addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
-      }
-      catch (Exception e) {
-        log.makeAlert(e, "Failed to load segment from segment cache file.")
-           .addData("file", file)
-           .emit();
-      }
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Retrieving [%d] cached segment metadata files to cache.", 
segmentsToLoad.length);
+
+    for (File file : segmentsToLoad) {
+      executorService.submit(() -> {
+        try {
+          addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
+        }
+        catch (Exception e) {
+          log.makeAlert(e, "Failed to load segment from segment cache file.")
+             .addData("file", file)
+             .emit();
+        }
+        finally {
+          latch.countDown();
+        }
+      });
+    }
+
+    try {
+      latch.await();
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      log.error(e, "Interrupted when trying to retrieve cached segment 
metadata files");
+    }
+
+    stopwatch.stop();
+    log.info("Retrieved [%d,%d] cached segments in [%d]ms.", 
cachedSegments.size(), segmentsToLoad.length, stopwatch.millisElapsed());

Review Comment:
   ```suggestion
       log.info("Loaded [%d/%d] cached segments in [%d]ms.", 
cachedSegments.size(), segmentsToLoad.length, stopwatch.millisElapsed());
   ```



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws 
IOException
       }
     }
 
-    final List<DataSegment> cachedSegments = new ArrayList<>();
+    final ConcurrentLinkedQueue<DataSegment> cachedSegments = new 
ConcurrentLinkedQueue<>();
     final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+    CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+    boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec 
== null;
+    ExecutorService executorService = 
createdNewExecutorServiceToLoadSegmentCache
+                                      ? 
MoreExecutors.newDirectExecutorService()
+                                      : loadOnBootstrapExec;
 
     AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
 
-    for (int i = 0; i < segmentsToLoad.length; i++) {
-      final File file = segmentsToLoad[i];
-      log.info("Loading segment cache file [%d/%d][%s].", i + 1, 
segmentsToLoad.length, file);
-      try {
-        addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
-      }
-      catch (Exception e) {
-        log.makeAlert(e, "Failed to load segment from segment cache file.")
-           .addData("file", file)
-           .emit();
-      }
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Retrieving [%d] cached segment metadata files to cache.", 
segmentsToLoad.length);

Review Comment:
   ```suggestion
       log.info("Loading [%d] segments from disk to cache.", 
segmentsToLoad.length);
   ```



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -225,22 +228,49 @@ public List<DataSegment> getCachedSegments() throws 
IOException
       }
     }
 
-    final List<DataSegment> cachedSegments = new ArrayList<>();
+    final ConcurrentLinkedQueue<DataSegment> cachedSegments = new 
ConcurrentLinkedQueue<>();
     final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+    CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+
+    boolean createdNewExecutorServiceToLoadSegmentCache = loadOnBootstrapExec 
== null;

Review Comment:
   ```suggestion
       // If there is no dedicated bootstrap executor, perform the loading 
sequentially on the current thread
       boolean createdNewExecutorServiceToLoadSegmentCache = 
loadOnBootstrapExec == null;
   ```



##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -249,10 +279,14 @@ public List<DataSegment> getCachedSegments() throws 
IOException
          .emit();
     }
 
-    return cachedSegments;
+    return new ArrayList<>(cachedSegments);

Review Comment:
   Return an immutable list:
   ```suggestion
       return List.copyOf(cachedSegments);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to