elangelo commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r2917281641


##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())

Review Comment:
   The CallerRunsPolicy fallback does mean the submitting thread can run a task 
when the pool is saturated, but the submitting thread is the Solr request 
thread — it already carries full MDC context, so there's no MDC loss here. 
MDCAwareThreadPoolExecutor exists to propagate MDC to new pool threads; the 
caller-runs case doesn't need that propagation. On the cap concern: the 
maxParallel* setting is a throughput knob, not a hard safety limit. An 
occasional N+1 concurrent transfer when the pool is fully busy is negligible 
for a backup/restore workload.



##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
                   DirectoryFactory.DirContext.DEFAULT,
                   core.getSolrConfig().indexConfig.lockType);
       Set<String> indexDirFiles = new 
HashSet<>(Arrays.asList(indexDir.listAll()));
-      // Move all files from backupDir to restoreIndexDir
-      for (String filename : repository.listAllFiles()) {
-        checkInterrupted();
-        try {
-          if (indexDirFiles.contains(filename)) {
-            Checksum cs = repository.checksum(filename);
-            IndexFetcher.CompareResult compareResult;
-            if (cs == null) {
-              compareResult = new IndexFetcher.CompareResult();
-              compareResult.equal = false;
-            } else {
-              compareResult = IndexFetcher.compareFile(indexDir, filename, 
cs.size, cs.checksum);
+
+      // Capture directories as final for lambda access
+      final Directory finalIndexDir = indexDir;
+      final Directory finalRestoreIndexDir = restoreIndexDir;
+
+      // Only use an executor for parallel downloads when parallelism > 1
+      // When set to 1, run synchronously to avoid thread-local state issues 
with CallerRunsPolicy
+      int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+      ExecutorService executor =
+          maxParallelDownloads > 1
+              ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+                  0,
+                  maxParallelDownloads,
+                  60L,
+                  TimeUnit.SECONDS,
+                  new SynchronousQueue<>(),
+                  new SolrNamedThreadFactory("RestoreCore"),
+                  new ThreadPoolExecutor.CallerRunsPolicy())
+              : null;
+
+      List<Future<?>> downloadFutures = new ArrayList<>();
+
+      try {
+        // Move all files from backupDir to restoreIndexDir
+        for (String filename : repository.listAllFiles()) {
+          checkInterrupted();
+
+          // Capture variables for lambda
+          final String filenameFinal = filename;
+          final boolean fileExistsLocally = indexDirFiles.contains(filename);
+
+          Runnable downloadTask =
+              () -> {
+                try {
+                  if (fileExistsLocally) {
+                    Checksum cs = repository.checksum(filenameFinal);
+                    IndexFetcher.CompareResult compareResult;
+                    if (cs == null) {
+                      compareResult = new IndexFetcher.CompareResult();
+                      compareResult.equal = false;
+                    } else {
+                      compareResult =
+                          IndexFetcher.compareFile(
+                              finalIndexDir, filenameFinal, cs.size, 
cs.checksum);
+                    }
+                    if (!compareResult.equal
+                        || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
+                            filenameFinal, cs.size, compareResult))) {
+                      repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                    } else {
+                      // prefer local copy
+                      repository.localCopy(finalIndexDir, filenameFinal, 
finalRestoreIndexDir);
+                    }
+                  } else {
+                    repository.repoCopy(filenameFinal, finalRestoreIndexDir);
+                  }
+                } catch (Exception e) {
+                  log.warn("Exception while restoring the backup index ", e);
+                  throw new RuntimeException(
+                      "Exception while restoring the backup index for file: " 
+ filenameFinal, e);
+                }
+              };
+
+          if (executor != null) {
+            downloadFutures.add(executor.submit(downloadTask));
+          } else {
+            // Run synchronously when parallelism is 1
+            try {
+              downloadTask.run();
+            } catch (RuntimeException e) {
+              if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+              }
+              throw e;
+            }
+          }
+        }
+
+        // Wait for all downloads to complete and collect any errors (only if 
using executor)
+        if (executor != null) {
+          // We need to wait for ALL futures to ensure all files are processed
+          Throwable firstError = null;
+          for (Future<?> future : downloadFutures) {
+            try {
+              future.get();
+            } catch (ExecutionException e) {
+              if (firstError == null) {
+                Throwable cause = e.getCause();
+                // Unwrap RuntimeExceptions that wrap the original IOException
+                if (cause instanceof RuntimeException && cause.getCause() != 
null) {
+                  firstError = cause.getCause();
+                } else {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to