GWphua commented on code in PR #18489:
URL: https://github.com/apache/druid/pull/18489#discussion_r2394210764
##########
server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java:
##########
@@ -181,90 +185,120 @@ public boolean canHandleSegments()
}
@Override
- public List<DataSegment> getCachedSegments() throws IOException
+ public Collection<DataSegment> getCachedSegments() throws IOException
{
if (!canHandleSegments()) {
throw DruidException.defensive(
"canHandleSegments() is false. getCachedSegments() must be invoked
only when canHandleSegments() returns true."
);
}
- final File infoDir = getEffectiveInfoDir();
- FileUtils.mkdirp(infoDir);
- final List<DataSegment> cachedSegments = new ArrayList<>();
- final File[] segmentsToLoad = infoDir.listFiles();
+ final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
+ final ConcurrentLinkedQueue<DataSegment> cachedSegments = new
ConcurrentLinkedQueue<>();
- int ignored = 0;
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Retrieving [%d] cached segment metadata files to cache.",
segmentsToLoad.length);
- for (int i = 0; i < segmentsToLoad.length; i++) {
- final File file = segmentsToLoad[i];
- log.info("Loading segment cache file [%d/%d][%s].", i + 1,
segmentsToLoad.length, file);
- try {
- final DataSegment segment = jsonMapper.readValue(file,
DataSegment.class);
- boolean removeInfo = false;
- if (!segment.getId().toString().equals(file.getName())) {
- log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(),
segment.getId());
- ignored++;
- } else {
- removeInfo = true;
- final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment);
- for (StorageLocation location : locations) {
- // check for migrate from old nested local storage path format
- final File legacyPath = new File(location.getPath(),
DataSegmentPusher.getDefaultStorageDir(segment, false));
- if (legacyPath.exists()) {
- final File destination =
cacheEntry.toPotentialLocation(location.getPath());
- FileUtils.mkdirp(destination);
- final File[] oldFiles = legacyPath.listFiles();
- final File[] newFiles = destination.listFiles();
- // make sure old files exist and new files do not exist
- if (oldFiles != null && oldFiles.length > 0 && newFiles != null
&& newFiles.length == 0) {
- Files.move(legacyPath.toPath(), destination.toPath(),
StandardCopyOption.ATOMIC_MOVE);
- }
- cleanupLegacyCacheLocation(location.getPath(), legacyPath);
- }
+ AtomicInteger ignoredfileCounter = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(segmentsToLoad.length);
+ ExecutorService exec = Objects.requireNonNullElseGet(loadOnBootstrapExec,
MoreExecutors::newDirectExecutorService);
Review Comment:
If I'm understanding you correctly, then the flow will be something like:
1. Put getBootstrapSegments() into array.
2. Retrieve `File[] segmentsToLoad` from `getEffectiveInfoDir()`
3. Parallelization, each thread does the following:
- Load N cached segments
- loadSegmentOnBootstrap for N cached segments
- Announce the N cached segments.
5. Some of the threads that finished working can load the bootstrap
segments. (This can happen anytime after (1))
This will indeed involve some refactoring... But I guess we can have the
above steps? Let me know if i am missing anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]