This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 35878952a44 improve segment cache mount/unmount resilience if
interrupted (#18833)
35878952a44 is described below
commit 35878952a440a49e5630c2540459483fb1aad46d
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Dec 10 21:05:28 2025 -0800
improve segment cache mount/unmount resilience if interrupted (#18833)
---
.../segment/loading/SegmentLocalCacheManager.java | 9 +-
.../SegmentLocalCacheManagerConcurrencyTest.java | 127 ++++++++++++++++++++-
2 files changed, 128 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index f56a4aa1264..65c06c69c01 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -1003,7 +1003,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
);
atomicMoveAndDeleteCacheEntryDirectory(storageDir);
} else {
- needsLoad = false;
+ needsLoad = referenceProvider != null;
}
}
if (needsLoad) {
@@ -1069,14 +1069,17 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
try {
synchronized (this) {
if (referenceProvider != null) {
- referenceProvider.close();
+ ReferenceCountedSegmentProvider provider = referenceProvider;
referenceProvider = null;
+ provider.close();
}
if (!config.isDeleteOnRemove()) {
return;
}
if (storageDir != null) {
- atomicMoveAndDeleteCacheEntryDirectory(storageDir);
+ if (storageDir.exists()) {
+ atomicMoveAndDeleteCacheEntryDirectory(storageDir);
+ }
storageDir = null;
location = null;
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index aa759c7e120..85e065f701b 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -319,7 +319,7 @@ class SegmentLocalCacheManagerConcurrencyTest
makeSegmentsToLoad(segmentCount, localStorageFolder, interval,
segmentsToWeakLoad);
for (boolean sleepy : new boolean[]{true, false}) {
- testWeakLoad(iterations, segmentCount, concurrentReads, false, sleepy,
false);
+ testWeakLoad(iterations, segmentCount, concurrentReads, false, sleepy,
false, false);
}
}
@@ -337,7 +337,7 @@ class SegmentLocalCacheManagerConcurrencyTest
makeSegmentsToLoad(segmentCount, localStorageFolder, interval,
segmentsToWeakLoad);
for (boolean sleepy : new boolean[]{true, false}) {
- testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy,
true);
+ testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy,
true, false);
}
}
@@ -355,7 +355,7 @@ class SegmentLocalCacheManagerConcurrencyTest
makeSegmentsToLoad(segmentCount, localStorageFolder, interval,
segmentsToWeakLoad);
for (boolean sleepy : new boolean[]{true, false}) {
- testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy,
true);
+ testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy,
true, false);
}
}
@@ -373,7 +373,7 @@ class SegmentLocalCacheManagerConcurrencyTest
for (boolean sleepy : new boolean[]{true, false}) {
// use different segments for each run, otherwise the 2nd run is all
cache hits
- testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy,
true);
+ testWeakLoad(iterations, segmentCount, concurrentReads, true, sleepy,
true, true);
}
}
@@ -576,7 +576,8 @@ class SegmentLocalCacheManagerConcurrencyTest
int concurrentReads,
boolean random,
boolean sleepy,
- boolean expectHits
+ boolean expectHits,
+ boolean expectNoFailures
)
{
int totalSuccess = 0;
@@ -634,6 +635,10 @@ class SegmentLocalCacheManagerConcurrencyTest
Assertions.assertTrue(totalFailures <=
location.getWeakStats().getRejectCount() + location2.getWeakStats()
.getRejectCount());
+ if (expectNoFailures) {
+ Assertions.assertEquals(0, totalFailures);
+ Assertions.assertEquals(iterations, totalSuccess);
+ }
if (expectHits) {
Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
@@ -708,7 +713,82 @@ class SegmentLocalCacheManagerConcurrencyTest
return new BatchResult(exceptions, success, rows);
}
+ @Test
+ public void testAcquireSegmentOnDemandRandomSegmentWithInterrupt() throws
IOException, InterruptedException
+ {
+ final int segmentCount = 8;
+ final int iterations = 2000;
+ final int concurrentReads = 10;
+ final File localStorageFolder = new File(tempDir, "local_storage_folder");
+
+ final Interval interval = Intervals.of("2019-01-01/P1D");
+
+ makeSegmentsToLoad(segmentCount, localStorageFolder, interval,
segmentsToWeakLoad);
+
+ final List<DataSegment> currentBatch = new ArrayList<>();
+ for (int i = 0; i < iterations; i++) {
+
currentBatch.add(segmentsToWeakLoad.get(ThreadLocalRandom.current().nextInt(segmentCount)));
+ // process batches of 10 requests at a time
+ if (currentBatch.size() == concurrentReads) {
+ final List<InterruptedLoad> weakLoads = currentBatch
+ .stream()
+ .map(segment -> new InterruptedLoad(virtualStorageManager,
segment))
+ .collect(Collectors.toList());
+ final List<Future<Integer>> futures = new ArrayList<>();
+ for (InterruptedLoad weakLoad : weakLoads) {
+ futures.add(executorService.submit(weakLoad));
+ }
+ for (Future<Integer> future : futures) {
+ try {
+ future.get(20L, TimeUnit.MILLISECONDS);
+ }
+ catch (Throwable t) {
+ }
+ }
+ while (true) {
+ boolean allDone = true;
+ for (Future<?> f : futures) {
+ allDone = allDone && f.isDone();
+ }
+ if (allDone) {
+ break;
+ }
+ Thread.sleep(5);
+ }
+ Assertions.assertEquals(0, location.getActiveWeakHolds());
+ Assertions.assertEquals(0, location2.getActiveWeakHolds());
+ currentBatch.clear();
+ }
+ }
+
+ Assertions.assertTrue(location.getWeakStats().getHitCount() >= 0);
+ Assertions.assertTrue(location2.getWeakStats().getHitCount() >= 0);
+
+ // now ensure that we can successfully do stuff after all those interrupts
+ int totalSuccess = 0;
+ int totalFailures = 0;
+ for (int i = 0; i < iterations; i++) {
+ int segment = ThreadLocalRandom.current().nextInt(segmentCount);
+ currentBatch.add(segmentsToWeakLoad.get(segment));
+ // process batches of 10 requests at a time
+ if (currentBatch.size() == concurrentReads) {
+ BatchResult result = testWeakBatch(i, currentBatch, false);
+ totalSuccess += result.success;
+ totalFailures += result.exceptions.size();
+ currentBatch.clear();
+ }
+ }
+ Assertions.assertEquals(iterations, totalSuccess);
+ Assertions.assertEquals(0, totalFailures);
+ Assertions.assertEquals(0, location.getActiveWeakHolds());
+ Assertions.assertEquals(0, location2.getActiveWeakHolds());
+ Assertions.assertTrue(4 >= location.getWeakEntryCount());
+ Assertions.assertTrue(4 >= location2.getWeakEntryCount());
+ // 5 because __drop path
+ Assertions.assertTrue(5 >= location.getPath().listFiles().length);
+ Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
+ }
private void assertNoLooseEnds()
{
@@ -925,4 +1005,41 @@ class SegmentLocalCacheManagerConcurrencyTest
}
}
}
+
+ private static class InterruptedLoad implements Callable<Integer>
+ {
+ private final SegmentLocalCacheManager segmentManager;
+ private final DataSegment segment;
+
+ private InterruptedLoad(
+ SegmentLocalCacheManager segmentManager,
+ DataSegment segment
+ )
+ {
+ this.segmentManager = segmentManager;
+ this.segment = segment;
+ }
+
+ @Override
+ public Integer call() throws SegmentLoadingException
+ {
+ final Closer closer = Closer.create();
+ final AcquireSegmentAction action = closer.register(
+ segmentManager.acquireSegment(segment)
+ );
+ try {
+ final Future<AcquireSegmentResult> result = action.getSegmentFuture();
+ Thread.sleep(ThreadLocalRandom.current().nextInt(50));
+ result.cancel(true);
+ Thread.currentThread().interrupt();
+ }
+ catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ finally {
+ CloseableUtils.closeAndWrapExceptions(closer);
+ }
+ return null;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]