Copilot commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r2908810858
##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new
HashSet<>(Arrays.asList(indexDir.listAll()));
- // Move all files from backupDir to restoreIndexDir
- for (String filename : repository.listAllFiles()) {
- checkInterrupted();
- try {
- if (indexDirFiles.contains(filename)) {
- Checksum cs = repository.checksum(filename);
- IndexFetcher.CompareResult compareResult;
- if (cs == null) {
- compareResult = new IndexFetcher.CompareResult();
- compareResult.equal = false;
- } else {
- compareResult = IndexFetcher.compareFile(indexDir, filename,
cs.size, cs.checksum);
+
+ // Capture directories as final for lambda access
+ final Directory finalIndexDir = indexDir;
+ final Directory finalRestoreIndexDir = restoreIndexDir;
+
+ // Only use an executor for parallel downloads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+ ExecutorService executor =
+ maxParallelDownloads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelDownloads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("RestoreCore"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
Review Comment:
Using `SynchronousQueue` with `CallerRunsPolicy` means once all
`maxParallelDownloads` threads are busy, additional downloads will execute on
the calling thread. That can exceed the configured cap (up to
`maxParallelDownloads + 1` concurrent transfers) and also bypass the
`MDCAwareThreadPoolExecutor` wrapping for those caller-run tasks. Consider a
bounded queue/fixed pool or explicitly limiting in-flight submissions to
enforce the configured parallelism.
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
+ // Only use an executor for parallel uploads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+ ExecutorService executor =
+ maxParallelUploads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelUploads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("IncrementalBackup"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
Review Comment:
With `SynchronousQueue` + `CallerRunsPolicy`, once `maxParallelUploads`
threads are busy, additional uploads will run on the submitting thread. That
can exceed the configured cap (up to `maxParallelUploads + 1` concurrent
uploads) and bypass `MDCAwareThreadPoolExecutor` wrapping for caller-run tasks.
Consider a bounded queue/fixed pool or explicitly bounding in-flight
submissions to enforce the configured limit.
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
+ // Only use an executor for parallel uploads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+ ExecutorService executor =
+ maxParallelUploads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelUploads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("IncrementalBackup"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
Review Comment:
This implementation queues a `Future` for every index file and holds them in
`uploadFutures` until the end. For large indexes this can create significant
memory overhead and delays error reporting. Consider processing completed tasks
as they finish (e.g., `ExecutorCompletionService`) and/or limiting in-flight
submissions to `maxParallelUploads`.
##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new
HashSet<>(Arrays.asList(indexDir.listAll()));
- // Move all files from backupDir to restoreIndexDir
- for (String filename : repository.listAllFiles()) {
- checkInterrupted();
- try {
- if (indexDirFiles.contains(filename)) {
- Checksum cs = repository.checksum(filename);
- IndexFetcher.CompareResult compareResult;
- if (cs == null) {
- compareResult = new IndexFetcher.CompareResult();
- compareResult.equal = false;
- } else {
- compareResult = IndexFetcher.compareFile(indexDir, filename,
cs.size, cs.checksum);
+
+ // Capture directories as final for lambda access
+ final Directory finalIndexDir = indexDir;
+ final Directory finalRestoreIndexDir = restoreIndexDir;
+
+ // Only use an executor for parallel downloads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+ ExecutorService executor =
+ maxParallelDownloads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelDownloads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("RestoreCore"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> downloadFutures = new ArrayList<>();
+
+ try {
+ // Move all files from backupDir to restoreIndexDir
+ for (String filename : repository.listAllFiles()) {
+ checkInterrupted();
+
+ // Capture variables for lambda
+ final String filenameFinal = filename;
+ final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+ Runnable downloadTask =
+ () -> {
+ try {
+ if (fileExistsLocally) {
+ Checksum cs = repository.checksum(filenameFinal);
+ IndexFetcher.CompareResult compareResult;
+ if (cs == null) {
+ compareResult = new IndexFetcher.CompareResult();
+ compareResult.equal = false;
+ } else {
+ compareResult =
+ IndexFetcher.compareFile(
+ finalIndexDir, filenameFinal, cs.size,
cs.checksum);
+ }
+ if (!compareResult.equal
+ || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+ filenameFinal, cs.size, compareResult))) {
+ repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+ } else {
+ // prefer local copy
+ repository.localCopy(finalIndexDir, filenameFinal,
finalRestoreIndexDir);
+ }
+ } else {
+ repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+ }
+ } catch (Exception e) {
+ log.warn("Exception while restoring the backup index ", e);
+ throw new RuntimeException(
+ "Exception while restoring the backup index for file: "
+ filenameFinal, e);
+ }
+ };
+
+ if (executor != null) {
+ downloadFutures.add(executor.submit(downloadTask));
+ } else {
+ // Run synchronously when parallelism is 1
+ try {
+ downloadTask.run();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw e;
+ }
+ }
+ }
+
+ // Wait for all downloads to complete and collect any errors (only if
using executor)
+ if (executor != null) {
+ // We need to wait for ALL futures to ensure all files are processed
+ Throwable firstError = null;
+ for (Future<?> future : downloadFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (firstError == null) {
+ Throwable cause = e.getCause();
+ // Unwrap RuntimeExceptions that wrap the original IOException
+ if (cause instanceof RuntimeException && cause.getCause() !=
null) {
+ firstError = cause.getCause();
+ } else {
Review Comment:
In the parallel path, the `RuntimeException` unwrapping (`firstError =
cause.getCause()`) can discard the wrapper message that contains the filename.
Prefer preserving that message (or re-wrapping the cause with filename context)
when propagating the first failure.
##########
changelog/unreleased/parallelizebackups.yml:
##########
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: Parallelize Backup and Restore File Operations
+type: changed
+authors:
+ - name: Samuel Verstraete
+ github: elangelo
Review Comment:
The changelog author metadata uses a `github` field, but this repository’s
changelog format documentation uses `nick` (optionally with `url`) under
`authors`. Using an unexpected key may fail changelog validation or omit author
info; please switch `github: elangelo` to `nick: elangelo` (and add `url` if
desired).
##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new
HashSet<>(Arrays.asList(indexDir.listAll()));
- // Move all files from backupDir to restoreIndexDir
- for (String filename : repository.listAllFiles()) {
- checkInterrupted();
- try {
- if (indexDirFiles.contains(filename)) {
- Checksum cs = repository.checksum(filename);
- IndexFetcher.CompareResult compareResult;
- if (cs == null) {
- compareResult = new IndexFetcher.CompareResult();
- compareResult.equal = false;
- } else {
- compareResult = IndexFetcher.compareFile(indexDir, filename,
cs.size, cs.checksum);
+
+ // Capture directories as final for lambda access
+ final Directory finalIndexDir = indexDir;
+ final Directory finalRestoreIndexDir = restoreIndexDir;
+
+ // Only use an executor for parallel downloads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+ ExecutorService executor =
+ maxParallelDownloads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelDownloads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("RestoreCore"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> downloadFutures = new ArrayList<>();
+
+ try {
+ // Move all files from backupDir to restoreIndexDir
+ for (String filename : repository.listAllFiles()) {
+ checkInterrupted();
+
+ // Capture variables for lambda
+ final String filenameFinal = filename;
+ final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+ Runnable downloadTask =
+ () -> {
+ try {
+ if (fileExistsLocally) {
+ Checksum cs = repository.checksum(filenameFinal);
+ IndexFetcher.CompareResult compareResult;
+ if (cs == null) {
+ compareResult = new IndexFetcher.CompareResult();
+ compareResult.equal = false;
+ } else {
+ compareResult =
+ IndexFetcher.compareFile(
+ finalIndexDir, filenameFinal, cs.size,
cs.checksum);
+ }
+ if (!compareResult.equal
+ || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+ filenameFinal, cs.size, compareResult))) {
+ repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+ } else {
+ // prefer local copy
+ repository.localCopy(finalIndexDir, filenameFinal,
finalRestoreIndexDir);
+ }
+ } else {
+ repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+ }
+ } catch (Exception e) {
+ log.warn("Exception while restoring the backup index ", e);
+ throw new RuntimeException(
+ "Exception while restoring the backup index for file: "
+ filenameFinal, e);
+ }
+ };
+
+ if (executor != null) {
+ downloadFutures.add(executor.submit(downloadTask));
+ } else {
+ // Run synchronously when parallelism is 1
+ try {
+ downloadTask.run();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw e;
Review Comment:
In the synchronous path, unwrapping and rethrowing only `e.getCause()` drops
the wrapper message that includes per-file context (filename). Preserve the
wrapper message (or re-wrap the cause including the filename) so restore
failures remain actionable.
##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new
HashSet<>(Arrays.asList(indexDir.listAll()));
- // Move all files from backupDir to restoreIndexDir
- for (String filename : repository.listAllFiles()) {
- checkInterrupted();
- try {
- if (indexDirFiles.contains(filename)) {
- Checksum cs = repository.checksum(filename);
- IndexFetcher.CompareResult compareResult;
- if (cs == null) {
- compareResult = new IndexFetcher.CompareResult();
- compareResult.equal = false;
- } else {
- compareResult = IndexFetcher.compareFile(indexDir, filename,
cs.size, cs.checksum);
+
+ // Capture directories as final for lambda access
+ final Directory finalIndexDir = indexDir;
+ final Directory finalRestoreIndexDir = restoreIndexDir;
+
+ // Only use an executor for parallel downloads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+ ExecutorService executor =
+ maxParallelDownloads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelDownloads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("RestoreCore"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> downloadFutures = new ArrayList<>();
+
Review Comment:
This submits one task per index file and retains every `Future` in
`downloadFutures` until the end. For large collections with many segment files,
that can add substantial memory/GC overhead and delays surfacing failures until
all tasks are submitted. Consider processing completions incrementally (e.g.,
`ExecutorCompletionService`) and/or bounding the number of in-flight tasks to
`maxParallelDownloads`.
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
+ // Only use an executor for parallel uploads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+ ExecutorService executor =
+ maxParallelUploads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelUploads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("IncrementalBackup"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ if (executor != null) {
+ uploadFutures.add(executor.submit(uploadTask));
+ } else {
+ // Run synchronously when parallelism is 1
+ try {
+ uploadTask.run();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw e;
Review Comment:
In the synchronous path, rethrowing only `e.getCause()` (when it’s an
`IOException`) loses the wrapper message that includes the filename ("Failed to
process file: ..."). Preserve that per-file context when propagating errors so
backup failures are diagnosable.
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
+ // Only use an executor for parallel uploads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+ ExecutorService executor =
+ maxParallelUploads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelUploads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("IncrementalBackup"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ if (executor != null) {
+ uploadFutures.add(executor.submit(uploadTask));
+ } else {
+ // Run synchronously when parallelism is 1
+ try {
+ uploadTask.run();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw e;
+ }
}
}
- String backedFileName = UUID.randomUUID().toString();
- backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+ // Wait for all uploads to complete and collect any errors (only if
using executor)
+ if (executor != null) {
+ // We need to wait for ALL futures before throwing, otherwise we might
exit
+ // before all successfully uploaded files are added to
currentBackupPoint
+ Throwable firstError = null;
+ for (Future<?> future : uploadFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (firstError == null) {
+ Throwable cause = e.getCause();
+ // Unwrap RuntimeExceptions that wrap the original IOException
+ if (cause instanceof RuntimeException && cause.getCause() !=
null) {
+ firstError = cause.getCause();
+ } else {
Review Comment:
In the parallel join logic, unwrapping `RuntimeException` to
`cause.getCause()` can discard the wrapper message that includes the filename.
Preserve the wrapper message (or re-wrap the underlying `IOException` with file
context) when surfacing the first failure from `future.get()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]