kfaraz commented on code in PR #18489:
URL: https://github.com/apache/druid/pull/18489#discussion_r2325410632


##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -181,90 +185,120 @@ public boolean canHandleSegments()
   }
 
   @Override
-  public List<DataSegment> getCachedSegments() throws IOException
+  public Collection<DataSegment> getCachedSegments() throws IOException
   {
     if (!canHandleSegments()) {
       throw DruidException.defensive(
           "canHandleSegments() is false. getCachedSegments() must be invoked 
only when canHandleSegments() returns true."
       );
     }
-    final File infoDir = getEffectiveInfoDir();
-    FileUtils.mkdirp(infoDir);
 
-    final List<DataSegment> cachedSegments = new ArrayList<>();
-    final File[] segmentsToLoad = infoDir.listFiles();
+    final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+    final ConcurrentLinkedQueue<DataSegment> cachedSegments = new 
ConcurrentLinkedQueue<>();
 
-    int ignored = 0;
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    log.info("Retrieving [%d] cached segment metadata files to cache.", 
segmentsToLoad.length);
 
-    for (int i = 0; i < segmentsToLoad.length; i++) {
-      final File file = segmentsToLoad[i];
-      log.info("Loading segment cache file [%d/%d][%s].", i + 1, 
segmentsToLoad.length, file);
-      try {
-        final DataSegment segment = jsonMapper.readValue(file, 
DataSegment.class);
-        boolean removeInfo = false;
-        if (!segment.getId().toString().equals(file.getName())) {
-          log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), 
segment.getId());
-          ignored++;
-        } else {
-          removeInfo = true;
-          final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
-          for (StorageLocation location : locations) {
-            // check for migrate from old nested local storage path format
-            final File legacyPath = new File(location.getPath(), 
DataSegmentPusher.getDefaultStorageDir(segment, false));
-            if (legacyPath.exists()) {
-              final File destination = 
cacheEntry.toPotentialLocation(location.getPath());
-              FileUtils.mkdirp(destination);
-              final File[] oldFiles = legacyPath.listFiles();
-              final File[] newFiles = destination.listFiles();
-              // make sure old files exist and new files do not exist
-              if (oldFiles != null && oldFiles.length > 0 && newFiles != null 
&& newFiles.length == 0) {
-                Files.move(legacyPath.toPath(), destination.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
-              }
-              cleanupLegacyCacheLocation(location.getPath(), legacyPath);
-            }
+    AtomicInteger ignoredfileCounter = new AtomicInteger(0);
+    CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+    ExecutorService exec = Objects.requireNonNullElseGet(loadOnBootstrapExec, 
MoreExecutors::newDirectExecutorService);
 
-            if (cacheEntry.checkExists(location.getPath())) {
-              removeInfo = false;
-              final boolean reserveResult;
-              if (config.isVirtualStorage()) {
-                reserveResult = location.reserveWeak(cacheEntry);
-              } else {
-                reserveResult = location.reserve(cacheEntry);
-              }
-              if (!reserveResult) {
-                log.makeAlert(
-                    "storage[%s:%,d] has more segments than it is allowed. 
Currently loading Segment[%s:%,d]. Please increase druid.segmentCache.locations 
maxSize param",
-                    location.getPath(),
-                    location.availableSizeBytes(),
-                    segment.getId(),
-                    segment.getSize()
-                ).emit();
-              }
-              cachedSegments.add(segment);
-            }
+    for (File file : segmentsToLoad) {
+      exec.submit(() -> {
+        addFilesToCachedSegments(file, ignoredfileCounter, cachedSegments);
+        latch.countDown();
+      });
+    }
+
+    try {
+      latch.await();
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      log.makeAlert(e, "Interrupted when trying to retrieve cached segment 
metadata files");
+    }
+
+    stopwatch.stop();
+    log.info("Retrieved [%d,%d] cached segments in [%d]ms.", 
cachedSegments.size(), segmentsToLoad.length, stopwatch.millisElapsed());
+
+    if (ignoredfileCounter.get() > 0) {
+      log.makeAlert("Ignored misnamed segment cache files on startup.")
+         .addData("numIgnored", ignoredfileCounter.get())
+         .emit();
+    }
+
+    return cachedSegments;
+  }
+
+  private File[] retrieveSegmentMetadataFiles() throws IOException
+  {
+    File baseDir = config.getInfoDir();

Review Comment:
   The original code used `getEffectiveInfoDir()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to