This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 35878952a44 improve segment cache mount/unmount resilience if 
interrupted (#18833)
35878952a44 is described below

commit 35878952a440a49e5630c2540459483fb1aad46d
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Dec 10 21:05:28 2025 -0800

    improve segment cache mount/unmount resilience if interrupted (#18833)
---
 .../segment/loading/SegmentLocalCacheManager.java  |   9 +-
 .../SegmentLocalCacheManagerConcurrencyTest.java   | 127 ++++++++++++++++++++-
 2 files changed, 128 insertions(+), 8 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index f56a4aa1264..65c06c69c01 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -1003,7 +1003,7 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
               );
               atomicMoveAndDeleteCacheEntryDirectory(storageDir);
             } else {
-              needsLoad = false;
+              needsLoad = referenceProvider != null;
             }
           }
           if (needsLoad) {
@@ -1069,14 +1069,17 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
       try {
         synchronized (this) {
           if (referenceProvider != null) {
-            referenceProvider.close();
+            ReferenceCountedSegmentProvider provider = referenceProvider;
             referenceProvider = null;
+            provider.close();
           }
           if (!config.isDeleteOnRemove()) {
             return;
           }
           if (storageDir != null) {
-            atomicMoveAndDeleteCacheEntryDirectory(storageDir);
+            if (storageDir.exists()) {
+              atomicMoveAndDeleteCacheEntryDirectory(storageDir);
+            }
             storageDir = null;
             location = null;
           }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index aa759c7e120..85e065f701b 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -319,7 +319,7 @@ class SegmentLocalCacheManagerConcurrencyTest
     makeSegmentsToLoad(segmentCount, localStorageFolder, interval, 
segmentsToWeakLoad);
 
     for (boolean sleepy : new boolean[]{true, false}) {
-      testWeakLoad(iterations, segmentCount, concurrentReads, false, sleepy, 
false);
+      testWeakLoad(iterations, segmentCount, concurrentReads, false, sleepy, 
false, false);
     }
   }
 
@@ -337,7 +337,7 @@ class SegmentLocalCacheManagerConcurrencyTest
     makeSegmentsToLoad(segmentCount, localStorageFolder, interval, 
segmentsToWeakLoad);
 
     for (boolean sleepy : new boolean[]{true, false}) {
-      testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, 
true);
+      testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, 
true, false);
     }
   }
 
@@ -355,7 +355,7 @@ class SegmentLocalCacheManagerConcurrencyTest
     makeSegmentsToLoad(segmentCount, localStorageFolder, interval, 
segmentsToWeakLoad);
 
     for (boolean sleepy : new boolean[]{true, false}) {
-      testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, 
true);
+      testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, 
true, false);
     }
   }
 
@@ -373,7 +373,7 @@ class SegmentLocalCacheManagerConcurrencyTest
 
     for (boolean sleepy : new boolean[]{true, false}) {
       // use different segments for each run, otherwise the 2nd run is all 
cache hits
-      testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, 
true);
+      testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy, 
true, true);
     }
   }
 
@@ -576,7 +576,8 @@ class SegmentLocalCacheManagerConcurrencyTest
       int concurrentReads,
       boolean random,
       boolean sleepy,
-      boolean expectHits
+      boolean expectHits,
+      boolean expectNoFailures
   )
   {
     int totalSuccess = 0;
@@ -634,6 +635,10 @@ class SegmentLocalCacheManagerConcurrencyTest
     Assertions.assertTrue(totalFailures <= 
location.getWeakStats().getRejectCount() + location2.getWeakStats()
                                                                                
                .getRejectCount());
 
+    if (expectNoFailures) {
+      Assertions.assertEquals(0, totalFailures);
+      Assertions.assertEquals(iterations, totalSuccess);
+    }
     if (expectHits) {
       Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
       Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
@@ -708,7 +713,82 @@ class SegmentLocalCacheManagerConcurrencyTest
     return new BatchResult(exceptions, success, rows);
   }
 
+  @Test
+  public void testAcquireSegmentOnDemandRandomSegmentWithInterrupt() throws 
IOException, InterruptedException
+  {
+    final int segmentCount = 8;
+    final int iterations = 2000;
+    final int concurrentReads = 10;
+    final File localStorageFolder = new File(tempDir, "local_storage_folder");
+
+    final Interval interval = Intervals.of("2019-01-01/P1D");
+
+    makeSegmentsToLoad(segmentCount, localStorageFolder, interval, 
segmentsToWeakLoad);
+
+    final List<DataSegment> currentBatch = new ArrayList<>();
+    for (int i = 0; i < iterations; i++) {
+      
currentBatch.add(segmentsToWeakLoad.get(ThreadLocalRandom.current().nextInt(segmentCount)));
+      // process batches of 10 requests at a time
+      if (currentBatch.size() == concurrentReads) {
+        final List<InterruptedLoad> weakLoads = currentBatch
+            .stream()
+            .map(segment -> new InterruptedLoad(virtualStorageManager, 
segment))
+            .collect(Collectors.toList());
+        final List<Future<Integer>> futures = new ArrayList<>();
+        for (InterruptedLoad weakLoad : weakLoads) {
+          futures.add(executorService.submit(weakLoad));
+        }
+        for (Future<Integer> future : futures) {
+          try {
+            future.get(20L, TimeUnit.MILLISECONDS);
+          }
+          catch (Throwable t) {
+          }
+        }
+        while (true) {
+          boolean allDone = true;
+          for (Future<?> f : futures) {
+            allDone = allDone && f.isDone();
+          }
+          if (allDone) {
+            break;
+          }
+          Thread.sleep(5);
+        }
+        Assertions.assertEquals(0, location.getActiveWeakHolds());
+        Assertions.assertEquals(0, location2.getActiveWeakHolds());
+        currentBatch.clear();
+      }
+    }
+
+    Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+    Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
+
+    // now ensure that we can successfully do stuff after all those interrupts
+    int totalSuccess = 0;
+    int totalFailures = 0;
+    for (int i = 0; i < iterations; i++) {
+      int segment = ThreadLocalRandom.current().nextInt(segmentCount);
+      currentBatch.add(segmentsToWeakLoad.get(segment));
+      // process batches of 10 requests at a time
+      if (currentBatch.size() == concurrentReads) {
 
+        BatchResult result = testWeakBatch(i, currentBatch, false);
+        totalSuccess += result.success;
+        totalFailures += result.exceptions.size();
+        currentBatch.clear();
+      }
+    }
+    Assertions.assertEquals(iterations, totalSuccess);
+    Assertions.assertEquals(0, totalFailures);
+    Assertions.assertEquals(0, location.getActiveWeakHolds());
+    Assertions.assertEquals(0, location2.getActiveWeakHolds());
+    Assertions.assertTrue(4 >= location.getWeakEntryCount());
+    Assertions.assertTrue(4 >= location2.getWeakEntryCount());
+    // 5 because __drop path
+    Assertions.assertTrue(5 >= location.getPath().listFiles().length);
+    Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
+  }
 
   private void assertNoLooseEnds()
   {
@@ -925,4 +1005,41 @@ class SegmentLocalCacheManagerConcurrencyTest
       }
     }
   }
+
+  private static class InterruptedLoad implements Callable<Integer>
+  {
+    private final SegmentLocalCacheManager segmentManager;
+    private final DataSegment segment;
+
+    private InterruptedLoad(
+        SegmentLocalCacheManager segmentManager,
+        DataSegment segment
+    )
+    {
+      this.segmentManager = segmentManager;
+      this.segment = segment;
+    }
+
+    @Override
+    public Integer call() throws SegmentLoadingException
+    {
+      final Closer closer = Closer.create();
+      final AcquireSegmentAction action = closer.register(
+          segmentManager.acquireSegment(segment)
+      );
+      try {
+        final Future<AcquireSegmentResult> result = action.getSegmentFuture();
+        Thread.sleep(ThreadLocalRandom.current().nextInt(50));
+        result.cancel(true);
+        Thread.currentThread().interrupt();
+      }
+      catch (Throwable t) {
+        throw new RuntimeException(t);
+      }
+      finally {
+        CloseableUtils.closeAndWrapExceptions(closer);
+      }
+      return null;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to