This is an automated email from the ASF dual-hosted git repository.

frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 09881574c3a Refactor segmentLocalCacheManager (#18494)
09881574c3a is described below

commit 09881574c3a653e83d4bb47d44ddd2bf86f70d86
Author: Virushade <[email protected]>
AuthorDate: Wed Oct 22 10:34:28 2025 +0800

    Refactor segmentLocalCacheManager (#18494)
    
    * Refactor segmentLocalCacheManager
    
    * Try catch for addFilesToCachedSegments
---
 .../segment/loading/SegmentLocalCacheManager.java  | 124 +++++++++++----------
 1 file changed, 68 insertions(+), 56 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 162e20a28d1..104304dce6a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -61,6 +61,7 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 
@@ -188,68 +189,17 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
           "canHandleSegments() is false. getCachedSegments() must be invoked 
only when canHandleSegments() returns true."
       );
     }
-    final File infoDir = getEffectiveInfoDir();
-    FileUtils.mkdirp(infoDir);
 
     final List<DataSegment> cachedSegments = new ArrayList<>();
-    final File[] segmentsToLoad = infoDir.listFiles();
+    final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
 
-    int ignored = 0;
+    AtomicInteger ignoredFilesCounter = new AtomicInteger(0);
 
     for (int i = 0; i < segmentsToLoad.length; i++) {
       final File file = segmentsToLoad[i];
       log.info("Loading segment cache file [%d/%d][%s].", i + 1, 
segmentsToLoad.length, file);
       try {
-        final DataSegment segment = jsonMapper.readValue(file, 
DataSegment.class);
-        boolean removeInfo = false;
-        if (!segment.getId().toString().equals(file.getName())) {
-          log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), 
segment.getId());
-          ignored++;
-        } else {
-          removeInfo = true;
-          final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
-          for (StorageLocation location : locations) {
-            // check for migrate from old nested local storage path format
-            final File legacyPath = new File(location.getPath(), 
DataSegmentPusher.getDefaultStorageDir(segment, false));
-            if (legacyPath.exists()) {
-              final File destination = 
cacheEntry.toPotentialLocation(location.getPath());
-              FileUtils.mkdirp(destination);
-              final File[] oldFiles = legacyPath.listFiles();
-              final File[] newFiles = destination.listFiles();
-              // make sure old files exist and new files do not exist
-              if (oldFiles != null && oldFiles.length > 0 && newFiles != null 
&& newFiles.length == 0) {
-                Files.move(legacyPath.toPath(), destination.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
-              }
-              cleanupLegacyCacheLocation(location.getPath(), legacyPath);
-            }
-
-            if (cacheEntry.checkExists(location.getPath())) {
-              removeInfo = false;
-              final boolean reserveResult;
-              if (config.isVirtualStorage()) {
-                reserveResult = location.reserveWeak(cacheEntry);
-              } else {
-                reserveResult = location.reserve(cacheEntry);
-              }
-              if (!reserveResult) {
-                log.makeAlert(
-                    "storage[%s:%,d] has more segments than it is allowed. 
Currently loading Segment[%s:%,d]. Please increase druid.segmentCache.locations 
maxSize param",
-                    location.getPath(),
-                    location.availableSizeBytes(),
-                    segment.getId(),
-                    segment.getSize()
-                ).emit();
-              }
-              cachedSegments.add(segment);
-            }
-          }
-        }
-
-        if (removeInfo) {
-          final SegmentId segmentId = segment.getId();
-          log.warn("Unable to find cache file for segment[%s]. Deleting lookup 
entry.", segmentId);
-          removeInfoFile(segment);
-        }
+        addFilesToCachedSegments(file, ignoredFilesCounter, cachedSegments);
       }
       catch (Exception e) {
         log.makeAlert(e, "Failed to load segment from segment cache file.")
@@ -258,15 +208,77 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       }
     }
 
-    if (ignored > 0) {
+    if (ignoredFilesCounter.get() > 0) {
       log.makeAlert("Ignored misnamed segment cache files on startup.")
-         .addData("numIgnored", ignored)
+         .addData("numIgnored", ignoredFilesCounter.get())
          .emit();
     }
 
     return cachedSegments;
   }
 
+  private void addFilesToCachedSegments(File file, AtomicInteger ignored, 
List<DataSegment> cachedSegments) throws IOException
+  {
+    final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
+    boolean removeInfo = false;
+    if (!segment.getId().toString().equals(file.getName())) {
+      log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), 
segment.getId());
+      ignored.incrementAndGet();
+    } else {
+      removeInfo = true;
+      final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
+      for (StorageLocation location : locations) {
+        // check for migrate from old nested local storage path format
+        final File legacyPath = new File(location.getPath(), 
DataSegmentPusher.getDefaultStorageDir(segment, false));
+        if (legacyPath.exists()) {
+          final File destination = 
cacheEntry.toPotentialLocation(location.getPath());
+          FileUtils.mkdirp(destination);
+          final File[] oldFiles = legacyPath.listFiles();
+          final File[] newFiles = destination.listFiles();
+          // make sure old files exist and new files do not exist
+          if (oldFiles != null && oldFiles.length > 0 && newFiles != null && 
newFiles.length == 0) {
+            Files.move(legacyPath.toPath(), destination.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
+          }
+          cleanupLegacyCacheLocation(location.getPath(), legacyPath);
+        }
+
+        if (cacheEntry.checkExists(location.getPath())) {
+          removeInfo = false;
+          final boolean reserveResult;
+          if (config.isVirtualStorage()) {
+            reserveResult = location.reserveWeak(cacheEntry);
+          } else {
+            reserveResult = location.reserve(cacheEntry);
+          }
+          if (!reserveResult) {
+            log.makeAlert(
+                "storage[%s:%,d] has more segments than it is allowed. 
Currently loading Segment[%s:%,d]. Please increase druid.segmentCache.locations 
maxSize param",
+                location.getPath(),
+                location.availableSizeBytes(),
+                segment.getId(),
+                segment.getSize()
+            ).emit();
+          }
+          cachedSegments.add(segment);
+        }
+      }
+    }
+
+    if (removeInfo) {
+      final SegmentId segmentId = segment.getId();
+      log.warn("Unable to find cache file for segment[%s]. Deleting lookup 
entry.", segmentId);
+      removeInfoFile(segment);
+    }
+  }
+
+  private File[] retrieveSegmentMetadataFiles() throws IOException
+  {
+    final File infoDir = getEffectiveInfoDir();
+    FileUtils.mkdirp(infoDir);
+    File[] files = infoDir.listFiles();
+    return files == null ? new File[0] : files;
+  }
+
   @Override
   public void storeInfoFile(final DataSegment segment) throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to