elangelo commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r2917288426
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +213,160 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
+ // Only use an executor for parallel uploads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelUploads = DEFAULT_MAX_PARALLEL_UPLOADS;
+ ExecutorService executor =
+ maxParallelUploads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelUploads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("IncrementalBackup"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ if (executor != null) {
+ uploadFutures.add(executor.submit(uploadTask));
+ } else {
+ // Run synchronously when parallelism is 1
+ try {
+ uploadTask.run();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw e;
+ }
}
}
- String backedFileName = UUID.randomUUID().toString();
- backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+ // Wait for all uploads to complete and collect any errors (only if
using executor)
+ if (executor != null) {
+ // We need to wait for ALL futures before throwing, otherwise we might
exit
+ // before all successfully uploaded files are added to
currentBackupPoint
+ Throwable firstError = null;
+ for (Future<?> future : uploadFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (firstError == null) {
+ Throwable cause = e.getCause();
+ // Unwrap RuntimeExceptions that wrap the original IOException
+ if (cause instanceof RuntimeException && cause.getCause() !=
null) {
+ firstError = cause.getCause();
+ } else {
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]