This is an automated email from the ASF dual-hosted git repository.
gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 905f6d22df8 wire lazyLoadOnStart back up to segment loader config
(#18637)
905f6d22df8 is described below
commit 905f6d22df8f639507f3a88f6b275fdcecbd7434
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Oct 16 23:28:11 2025 -0700
wire lazyLoadOnStart back up to segment loader config (#18637)
---
.../segment/loading/SegmentLocalCacheManager.java | 20 ++++++++----
.../org/apache/druid/server/ServerManager.java | 32 +++++++++++--------
.../loading/SegmentLocalCacheManagerTest.java | 37 ++++++++++++++++++++++
3 files changed, 69 insertions(+), 20 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index c835026a224..7a61329b8e8 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -639,9 +639,11 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
if (cacheEntry.checkExists(location.getPath())) {
if (location.isReserved(cacheEntry.id) ||
location.reserve(cacheEntry)) {
final SegmentCacheEntry entry =
location.getCacheEntry(cacheEntry.id);
- entry.lazyLoadCallback = segmentLoadFailCallback;
- entry.mount(location);
- return entry;
+ if (entry != null) {
+ entry.lazyLoadCallback = segmentLoadFailCallback;
+ entry.mount(location);
+ return entry;
+ }
} else {
// entry is not reserved, clean it up
deleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath()));
@@ -658,9 +660,11 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
if (location.reserve(cacheEntry)) {
try {
final SegmentCacheEntry entry =
location.getCacheEntry(cacheEntry.id);
- entry.lazyLoadCallback = segmentLoadFailCallback;
- entry.mount(location);
- return entry;
+ if (entry != null) {
+ entry.lazyLoadCallback = segmentLoadFailCallback;
+ entry.mount(location);
+ return entry;
+ }
}
catch (SegmentLoadingException e) {
log.warn(e, "Failed to load segment[%s] in location[%s], trying next
location", cacheEntry.id, location.getPath());
@@ -831,7 +835,9 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
final SegmentizerFactory factory = getSegmentFactory(storageDir);
- final Segment segment = factory.factorize(dataSegment, storageDir,
false, lazyLoadCallback);
+ @SuppressWarnings("ObjectEquality")
+ final boolean lazy = config.isLazyLoadOnStart() && lazyLoadCallback
!= SegmentLazyLoadFailCallback.NOOP;
+ final Segment segment = factory.factorize(dataSegment, storageDir,
lazy, lazyLoadCallback);
// wipe load callback after calling
lazyLoadCallback = SegmentLazyLoadFailCallback.NOOP;
referenceProvider = ReferenceCountedSegmentProvider.of(segment);
diff --git a/server/src/main/java/org/apache/druid/server/ServerManager.java
b/server/src/main/java/org/apache/druid/server/ServerManager.java
index 732645b42ae..b7652090af4 100644
--- a/server/src/main/java/org/apache/druid/server/ServerManager.java
+++ b/server/src/main/java/org/apache/druid/server/ServerManager.java
@@ -308,21 +308,27 @@ public class ServerManager implements QuerySegmentWalker
final ListenableFuture<ReferenceCountedObjectProvider<Segment>> future
= futures.get(i);
final ReferenceCountedObjectProvider<Segment> referenceProvider =
future.get(timeoutAt - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- final Optional<Segment> segment = referenceProvider.acquireReference();
- try {
- final Optional<Segment> mappedSegment =
segmentMapFunction.apply(segment).map(safetyNet::register);
+ if (referenceProvider == null) {
segmentReferences.add(
- new SegmentReference(
- segmentAndDescriptor.getDescriptor(),
- mappedSegment,
- action
- )
+ new SegmentReference(segmentAndDescriptor.getDescriptor(),
Optional.empty(), action)
);
- }
- catch (Throwable t) {
- // if applying the mapFn failed, attach the base segment to the
closer and rethrow
- segment.ifPresent(safetyNet::register);
- throw t;
+ } else {
+ final Optional<Segment> segment =
referenceProvider.acquireReference();
+ try {
+ final Optional<Segment> mappedSegment =
segmentMapFunction.apply(segment).map(safetyNet::register);
+ segmentReferences.add(
+ new SegmentReference(
+ segmentAndDescriptor.getDescriptor(),
+ mappedSegment,
+ action
+ )
+ );
+ }
+ catch (Throwable t) {
+ // if applying the mapFn failed, attach the base segment to the
closer and rethrow
+ segment.ifPresent(safetyNet::register);
+ throw t;
+ }
}
}
catch (Throwable t) {
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index cf64244b86c..0ac4796cd3f 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -839,6 +839,43 @@ public class SegmentLocalCacheManagerTest extends
InitializedNullHandlingTest
Assert.assertEquals(dataSegment.getInterval(),
actualBootstrapSegment.getDataInterval());
}
+
+ @Test
+ public void testGetBootstrapSegmentLazy() throws SegmentLoadingException
+ {
+ final StorageLocationConfig locationConfig = new
StorageLocationConfig(localSegmentCacheDir, 10000L, null);
+ final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public boolean isLazyLoadOnStart()
+ {
+ return true;
+ }
+
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return List.of(locationConfig);
+ }
+ };
+ final List<StorageLocation> storageLocations =
loaderConfig.toStorageLocations();
+ SegmentLocalCacheManager manager = new SegmentLocalCacheManager(
+ storageLocations,
+ loaderConfig,
+ new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
+ TestHelper.getTestIndexIO(jsonMapper, ColumnConfig.DEFAULT),
+ jsonMapper
+ );
+
+ final DataSegment dataSegment = TestSegmentUtils.makeSegment("foo", "v1",
Intervals.of("2020/2021"));
+
+ manager.bootstrap(dataSegment, () -> {});
+ Segment actualBootstrapSegment =
manager.acquireCachedSegment(dataSegment).orElse(null);
+ Assert.assertNotNull(actualBootstrapSegment);
+ Assert.assertEquals(dataSegment.getId(), actualBootstrapSegment.getId());
+ Assert.assertEquals(dataSegment.getInterval(),
actualBootstrapSegment.getDataInterval());
+ }
+
@Test
public void testGetSegmentVirtualStorage() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]