elangelo commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r3200309649


##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        uploadFutures.add(executor.submit(uploadTask));
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for ALL futures before throwing - currentBackupPoint must 
reflect every
+      // successfully uploaded file before it is written, even when an error 
occurs.
+      Throwable firstError = null;
+      for (Future<?> future : uploadFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (firstError == null) {
+            firstError = e.getCause();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          if (firstError == null) {
+            firstError = e;
+          }
+        }
+      }
 
-      currentBackupPoint.addBackedFile(backedFileName, fileName, 
originalFileCS);
-      backupStats.uploadedFile(originalFileCS);
+      if (firstError != null) {
+        if (firstError instanceof Error) {
+          // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+          throw (Error) firstError;
+        } else if (firstError instanceof IOException) {
+          throw (IOException) firstError;
+        } else if (firstError instanceof RuntimeException) {
+          throw (RuntimeException) firstError;
+        } else if (firstError instanceof InterruptedException) {
+          throw new IOException("Backup interrupted", firstError);
+        } else {
+          throw new IOException("Error during parallel backup upload", 
firstError);
+        }

Review Comment:
   fixed



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;

Review Comment:
   fixed



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -52,6 +62,21 @@
  */
 public class IncrementalShardBackup {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Maximum number of files to upload in parallel during backup. Can be 
configured via the system
+   * property {@code solr.backup.maxparalleluploads} or environment variable 
{@code
+   * SOLR_BACKUP_MAXPARALLELUPLOADS}.
+   */
+  private static final int DEFAULT_MAX_PARALLEL_UPLOADS =

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to