krishan1390 commented on code in PR #16249:
URL: https://github.com/apache/pinot/pull/16249#discussion_r2184695459
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -195,30 +201,20 @@ public List<SegmentConversionResult>
executeTask(PinotTaskConfig pinotTaskConfig
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(),
taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs());
try {
- List<File> inputSegmentDirs = new ArrayList<>();
- int numRecords = 0;
-
- for (int i = 0; i < downloadURLs.length; i++) {
- String segmentName = segmentNames[i];
- // Download and decompress the segment file
- _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and
decompressing segment from: " + downloadURLs[i]
- + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
- File indexDir;
- try {
- indexDir = downloadSegmentToLocalAndUntar(tableNameWithType,
segmentName, downloadURLs[i], taskType,
- tempDataDir, "_" + i);
- } catch (Exception e) {
- LOGGER.error("Failed to download segment from download url: {}",
downloadURLs[i], e);
- _minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
- _eventObserver.notifyTaskError(_pinotTaskConfig, e);
- throw e;
+ AtomicInteger recordCount = new AtomicInteger(0);
+ List<File> inputSegmentDirs = Collections.synchronizedList(new
ArrayList<>(downloadURLs.length));
+ int nThreads =
Integer.parseInt(taskConfigs.getOrDefault(MinionConstants.SEGMENT_DOWNLOAD_THREAD_POOL_SIZE,
Review Comment:
we can also set nThreads = max(downloadURLs.length, nThreads) so that if
tasks download just 1 segment, then they don't create a new thread for it.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -352,6 +348,67 @@ public List<SegmentConversionResult>
executeTask(PinotTaskConfig pinotTaskConfig
}
}
+ private void parallelDownloadAndUntarSegments(int nThreads, String
tableNameWithType, String taskType,
+ String[] segmentNames, String[] downloadURLs, File tempDataDir,
AtomicInteger recordCounter,
+ List<File> inputSegmentDirs)
+ throws Exception {
+
+ ExecutorService executorService = null;
+ int length = downloadURLs.length;
+ try {
+ executorService = Executors.newFixedThreadPool(nThreads);
+ List<Future<Void>> futures = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ int index = i;
+ futures.add(executorService.submit(() -> {
+ downloadAndUntarSegment(tableNameWithType, taskType,
segmentNames[index], downloadURLs[index],
+ tempDataDir, index, recordCounter, inputSegmentDirs);
+ return null;
+ }));
+
+ // Wait for all downloads to complete and cancel other tasks if any
download fails
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ // Cancel all other download tasks
+ for (Future<Void> f : futures) {
+ f.cancel(true);
+ }
+ throw e;
+ }
+ }
+ }
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+ }
+
+ private void downloadAndUntarSegment(String tableNameWithType, String
taskType,
+ String segmentName, String downloadURL, File tempDataDir, int index,
AtomicInteger recordCounter,
+ List<File> inputSegmentDirs)
+ throws Exception {
+ try {
+ _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and
decompressing segment from: " + downloadURL
+ + " (" + (index + 1) + " out of " + inputSegmentDirs.size() + ")");
+ // Download and decompress the segment file
+ File indexDir = downloadSegmentToLocalAndUntar(tableNameWithType,
segmentName, downloadURL, taskType,
+ tempDataDir, "_" + index);
+ reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
+ SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
+ // Ensure segment directory placement is at same index as the segment
name in the inputSegmentNames
+ inputSegmentDirs.set(index, indexDir);
+ recordCounter.addAndGet(segmentMetadata.getTotalDocs());
+ } catch (Exception e) {
Review Comment:
nit - we need to catch the exception only for downloadSegmentToLocalAndUntar
right ? we should avoid catching the exception for the entire block of code ?
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -195,30 +201,20 @@ public List<SegmentConversionResult>
executeTask(PinotTaskConfig pinotTaskConfig
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(),
taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs());
try {
- List<File> inputSegmentDirs = new ArrayList<>();
- int numRecords = 0;
-
- for (int i = 0; i < downloadURLs.length; i++) {
- String segmentName = segmentNames[i];
- // Download and decompress the segment file
- _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and
decompressing segment from: " + downloadURLs[i]
- + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
- File indexDir;
- try {
- indexDir = downloadSegmentToLocalAndUntar(tableNameWithType,
segmentName, downloadURLs[i], taskType,
- tempDataDir, "_" + i);
- } catch (Exception e) {
- LOGGER.error("Failed to download segment from download url: {}",
downloadURLs[i], e);
- _minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
- _eventObserver.notifyTaskError(_pinotTaskConfig, e);
- throw e;
+ AtomicInteger recordCount = new AtomicInteger(0);
+ List<File> inputSegmentDirs = Collections.synchronizedList(new
ArrayList<>(downloadURLs.length));
+ int nThreads =
Integer.parseInt(taskConfigs.getOrDefault(MinionConstants.SEGMENT_DOWNLOAD_THREAD_POOL_SIZE,
Review Comment:
would it be better to use minionConf here rather than task config ? if its
part of task config, we will have to modify all task configs for all tables to
make use of this feature.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]