dsmiley commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r3199011876


##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -52,6 +62,21 @@
  */
 public class IncrementalShardBackup {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Maximum number of files to upload in parallel during backup. Can be 
configured via the system
+   * property {@code solr.backup.maxparalleluploads} or environment variable 
{@code
+   * SOLR_BACKUP_MAXPARALLELUPLOADS}.
+   */
+  private static final int DEFAULT_MAX_PARALLEL_UPLOADS =
+      EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1);
+
+  private static final ExecutorService BACKUP_EXECUTOR =
+      ExecutorUtil.newMDCAwareCachedThreadPool(
+          Math.max(1, DEFAULT_MAX_PARALLEL_UPLOADS),

Review Comment:
   I wouldn't bother with the 'max'; someone configuring this should know what 
they are doing.  Heck if they put -1 thinking it might disable, they'll now 
probably get the error they _should_ get.



##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,35 +128,96 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      ExecutorService executor = RESTORE_EXECUTOR;
+
+      List<Future<?>> downloadFutures = new ArrayList<>();
+
+      try {
+        // Move all files from backupDir to restoreIndexDir
+        for (String filename : repository.listAllFiles()) {
+          checkInterrupted();
+
+          // Capture variables for lambda
+          final String filenameFinal = filename;
+          final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+          Runnable downloadTask =
+              () -> {
+                try {
+                  if (fileExistsLocally) {
+                    Checksum cs = repository.checksum(filenameFinal);
+                    IndexFetcher.CompareResult compareResult;
+                    if (cs == null) {
+                      compareResult = new IndexFetcher.CompareResult();
+                      compareResult.equal = false;
+                    } else {
+                      compareResult =
+                          IndexFetcher.compareFile(
+                              finalIndexDir, filenameFinal, cs.size, 
cs.checksum);
+                    }
+                    if (!compareResult.equal
+                        || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+                            filenameFinal, cs.size, compareResult))) {
+                      repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                    } else {
+                      // prefer local copy
+                      repository.localCopy(finalIndexDir, filenameFinal, 
finalRestoreIndexDir);
+                    }
+                  } else {
+                    repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                  }
+                } catch (Exception e) {
+                  log.warn("Exception while restoring the backup index ", e);
+                  throw new RuntimeException(
+                      "Exception while restoring the backup index for file: " 
+ filenameFinal, e);
+                }
+              };
+
+          downloadFutures.add(executor.submit(downloadTask));
+        }
+
+        // Wait for ALL futures to ensure all files are processed
+        Throwable firstError = null;
+        for (Future<?> future : downloadFutures) {

Review Comment:
   same code-review feedback applies this this class



##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())

Review Comment:
   I agree on harder to control; lets drop it.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        uploadFutures.add(executor.submit(uploadTask));
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for ALL futures before throwing - currentBackupPoint must 
reflect every
+      // successfully uploaded file before it is written, even when an error 
occurs.
+      Throwable firstError = null;
+      for (Future<?> future : uploadFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (firstError == null) {
+            firstError = e.getCause();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();

Review Comment:
   Right. I'd actually flip the `try` and `for` nesting so we don't continue 
loop iterations on an error condition.  Don't bother propagating interruption 
status to this thread as we're going to end things expeditiously and report the 
error.
   
   Looking forward to Structured Concurrency some day to make this overall 
easier/better.  We'd have to `--enable-preview`  since it's _still_ incubating 
:-/



##########
solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java:
##########
@@ -88,6 +88,12 @@ public boolean reject(Thread t) {
       return true;
     }
 
+    // Static backup/restore thread pools - stateless, no core references, 
threads expire on idle
+    if (threadName.startsWith("IncrementalBackupExecutor-")

Review Comment:
   This should be touched sparingly.  We care a lot about cleaning up resources 
completely between tests, so as long as a CoreContainer shuts down, all the 
ExecutorServices inside had better shut down completely as part of the shutdown 
sequence.



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -52,6 +62,21 @@
  */
 public class IncrementalShardBackup {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Maximum number of files to upload in parallel during backup. Can be 
configured via the system
+   * property {@code solr.backup.maxparalleluploads} or environment variable 
{@code
+   * SOLR_BACKUP_MAXPARALLELUPLOADS}.
+   */
+  private static final int DEFAULT_MAX_PARALLEL_UPLOADS =
+      EnvUtils.getPropertyAsInteger("solr.backup.maxparalleluploads", 1);
+
+  private static final ExecutorService BACKUP_EXECUTOR =

Review Comment:
   as this is used for "uploads" (in the context of backups) and not for 
backups themselves, I think the name should reflect that, like 
BACKUP_UPLOAD_EXECUTOR (and similarly for the thread name)



##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String> 
indexFiles, Directory dir
     URI indexDir = incBackupFiles.getIndexDir();
     BackupStats backupStats = new BackupStats();
 
-    for (String fileName : indexFiles) {
-      Optional<ShardBackupMetadata.BackedFile> opBackedFile = 
oldBackupPoint.getFile(fileName);
-      Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
-      if (opBackedFile.isPresent()) {
-        ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
-        Checksum existedFileCS = backedFile.fileChecksum;
-        if (existedFileCS.equals(originalFileCS)) {
-          currentBackupPoint.addBackedFile(opBackedFile.get());
-          backupStats.skippedUploadingFile(existedFileCS);
-          continue;
-        }
+    ExecutorService executor = BACKUP_EXECUTOR;
+
+    List<Future<?>> uploadFutures = new ArrayList<>();
+
+    try {
+      for (String fileName : indexFiles) {
+        // Capture variable for lambda
+        final String fileNameFinal = fileName;
+
+        Runnable uploadTask =
+            () -> {
+              try {
+                // Calculate checksum and check if file already exists in 
previous backup
+                Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+                    oldBackupPoint.getFile(fileNameFinal);
+                Checksum originalFileCS = backupRepo.checksum(dir, 
fileNameFinal);
+
+                if (opBackedFile.isPresent()) {
+                  ShardBackupMetadata.BackedFile backedFile = 
opBackedFile.get();
+                  Checksum existedFileCS = backedFile.fileChecksum;
+                  if (existedFileCS.equals(originalFileCS)) {
+                    synchronized (currentBackupPoint) {
+                      currentBackupPoint.addBackedFile(opBackedFile.get());
+                    }
+                    backupStats.skippedUploadingFile(existedFileCS);
+                    return;
+                  }
+                }
+
+                // File doesn't exist or has changed - upload it
+                String backedFileName = UUID.randomUUID().toString();
+                backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir, 
backedFileName);
+
+                synchronized (currentBackupPoint) {
+                  currentBackupPoint.addBackedFile(backedFileName, 
fileNameFinal, originalFileCS);
+                }
+                backupStats.uploadedFile(originalFileCS);
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to process file: " + 
fileNameFinal, e);
+              }
+            };
+
+        uploadFutures.add(executor.submit(uploadTask));
       }
 
-      String backedFileName = UUID.randomUUID().toString();
-      backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+      // Wait for ALL futures before throwing - currentBackupPoint must 
reflect every
+      // successfully uploaded file before it is written, even when an error 
occurs.
+      Throwable firstError = null;
+      for (Future<?> future : uploadFutures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          if (firstError == null) {
+            firstError = e.getCause();
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          if (firstError == null) {
+            firstError = e;
+          }
+        }
+      }
 
-      currentBackupPoint.addBackedFile(backedFileName, fileName, 
originalFileCS);
-      backupStats.uploadedFile(originalFileCS);
+      if (firstError != null) {
+        if (firstError instanceof Error) {
+          // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+          throw (Error) firstError;
+        } else if (firstError instanceof IOException) {
+          throw (IOException) firstError;
+        } else if (firstError instanceof RuntimeException) {
+          throw (RuntimeException) firstError;
+        } else if (firstError instanceof InterruptedException) {
+          throw new IOException("Backup interrupted", firstError);
+        } else {
+          throw new IOException("Error during parallel backup upload", 
firstError);

Review Comment:
   At most API layers in Solr, SolrException is best.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to