mlbiscoc commented on code in PR #4023:
URL: https://github.com/apache/solr/pull/4023#discussion_r3197423594
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
- }
+ ExecutorService executor = BACKUP_EXECUTOR;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ uploadFutures.add(executor.submit(uploadTask));
}
- String backedFileName = UUID.randomUUID().toString();
- backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+ // Wait for ALL futures before throwing - currentBackupPoint must
reflect every
+ // successfully uploaded file before it is written, even when an error
occurs.
+ Throwable firstError = null;
+ for (Future<?> future : uploadFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (firstError == null) {
+ firstError = e.getCause();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
Review Comment:
If any of them fail, you need to cancel the existing futures here with
`future.cancel(True)` to stop executing the rest of the jobs in the queue. I
believe all this does it interrupt the calling thread.
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
- }
+ ExecutorService executor = BACKUP_EXECUTOR;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ uploadFutures.add(executor.submit(uploadTask));
}
- String backedFileName = UUID.randomUUID().toString();
- backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+ // Wait for ALL futures before throwing - currentBackupPoint must
reflect every
+ // successfully uploaded file before it is written, even when an error
occurs.
+ Throwable firstError = null;
+ for (Future<?> future : uploadFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (firstError == null) {
+ firstError = e.getCause();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (firstError == null) {
+ firstError = e;
+ }
+ }
+ }
- currentBackupPoint.addBackedFile(backedFileName, fileName,
originalFileCS);
- backupStats.uploadedFile(originalFileCS);
+ if (firstError != null) {
+ if (firstError instanceof Error) {
+ // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+ throw (Error) firstError;
+ } else if (firstError instanceof IOException) {
+ throw (IOException) firstError;
+ } else if (firstError instanceof RuntimeException) {
+ throw (RuntimeException) firstError;
+ } else if (firstError instanceof InterruptedException) {
+ throw new IOException("Backup interrupted", firstError);
+ } else {
+ throw new IOException("Error during parallel backup upload",
firstError);
+ }
+ }
+ } finally {
Review Comment:
Just remove this finally since it does nothing now.
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -52,6 +62,21 @@
*/
public class IncrementalShardBackup {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Maximum number of files to upload in parallel during backup. Can be
configured via the system
+ * property {@code solr.backup.maxparalleluploads} or environment variable
{@code
+ * SOLR_BACKUP_MAXPARALLELUPLOADS}.
+ */
+ private static final int DEFAULT_MAX_PARALLEL_UPLOADS =
Review Comment:
This should just be `MAX_PARALLEL_UPLOADS` and drop the `default` prefix
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
- }
+ ExecutorService executor = BACKUP_EXECUTOR;
Review Comment:
Just use `BACKUP_EXECUTOR` instead of doing this.
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
- }
+ ExecutorService executor = BACKUP_EXECUTOR;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ uploadFutures.add(executor.submit(uploadTask));
}
- String backedFileName = UUID.randomUUID().toString();
- backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+ // Wait for ALL futures before throwing - currentBackupPoint must
reflect every
+ // successfully uploaded file before it is written, even when an error
occurs.
+ Throwable firstError = null;
+ for (Future<?> future : uploadFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (firstError == null) {
+ firstError = e.getCause();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (firstError == null) {
+ firstError = e;
+ }
+ }
+ }
- currentBackupPoint.addBackedFile(backedFileName, fileName,
originalFileCS);
- backupStats.uploadedFile(originalFileCS);
+ if (firstError != null) {
+ if (firstError instanceof Error) {
+ // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+ throw (Error) firstError;
+ } else if (firstError instanceof IOException) {
+ throw (IOException) firstError;
+ } else if (firstError instanceof RuntimeException) {
+ throw (RuntimeException) firstError;
+ } else if (firstError instanceof InterruptedException) {
+ throw new IOException("Backup interrupted", firstError);
+ } else {
+ throw new IOException("Error during parallel backup upload",
firstError);
+ }
Review Comment:
In Java 21 you can use a switch case here
##########
solr/solr-ref-guide/modules/deployment-guide/pages/backup-restore.adoc:
##########
@@ -396,6 +396,39 @@ Any children under the `<repository>` tag are passed as
additional configuration
Information on each of the repository implementations provided with Solr is
provided below.
+=== Parallel File Transfers
+
+Backup and restore operations can transfer multiple index files in parallel to
improve throughput, especially when using cloud storage repositories like S3 or
GCS where latency is higher.
+The parallelism is controlled via system properties or environment variables:
+
+`solr.backup.maxparalleluploads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to upload in parallel during backup operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELUPLOADS` environment variable.
+Increasing this value can significantly improve backup throughput when using
cloud storage (S3, GCS), but too high a value will increase IOPS and bandwidth
pressure on your cluster.
+Start small and increase based on observed throughput and available resources.
+
+`solr.backup.maxparalleldownloads`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: `1`
+|===
++
+Maximum number of index files to download in parallel during restore
operations.
+Can also be set via the `SOLR_BACKUP_MAXPARALLELDOWNLOADS` environment
variable.
+Increasing this value can significantly improve restore throughput when using
cloud storage (S3, GCS), but too high a value will increase IOPS and bandwidth
pressure on your cluster.
+Start small and increase based on observed throughput and available resources.
+
+TIP: Both settings share a single global thread pool per property, so the
configured limit applies across all concurrent backup or restore operations on
the node.
Review Comment:
I misread this the first time as both backup restore share a single pool
which contradicted the settings. Can you rewrite this to be clearly state that
these are 2 separate thread pools
##########
solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java:
##########
@@ -191,55 +216,115 @@ private BackupStats incrementalCopy(Collection<String>
indexFiles, Directory dir
URI indexDir = incBackupFiles.getIndexDir();
BackupStats backupStats = new BackupStats();
- for (String fileName : indexFiles) {
- Optional<ShardBackupMetadata.BackedFile> opBackedFile =
oldBackupPoint.getFile(fileName);
- Checksum originalFileCS = backupRepo.checksum(dir, fileName);
-
- if (opBackedFile.isPresent()) {
- ShardBackupMetadata.BackedFile backedFile = opBackedFile.get();
- Checksum existedFileCS = backedFile.fileChecksum;
- if (existedFileCS.equals(originalFileCS)) {
- currentBackupPoint.addBackedFile(opBackedFile.get());
- backupStats.skippedUploadingFile(existedFileCS);
- continue;
- }
+ ExecutorService executor = BACKUP_EXECUTOR;
+
+ List<Future<?>> uploadFutures = new ArrayList<>();
+
+ try {
+ for (String fileName : indexFiles) {
+ // Capture variable for lambda
+ final String fileNameFinal = fileName;
+
+ Runnable uploadTask =
+ () -> {
+ try {
+ // Calculate checksum and check if file already exists in
previous backup
+ Optional<ShardBackupMetadata.BackedFile> opBackedFile =
+ oldBackupPoint.getFile(fileNameFinal);
+ Checksum originalFileCS = backupRepo.checksum(dir,
fileNameFinal);
+
+ if (opBackedFile.isPresent()) {
+ ShardBackupMetadata.BackedFile backedFile =
opBackedFile.get();
+ Checksum existedFileCS = backedFile.fileChecksum;
+ if (existedFileCS.equals(originalFileCS)) {
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(opBackedFile.get());
+ }
+ backupStats.skippedUploadingFile(existedFileCS);
+ return;
+ }
+ }
+
+ // File doesn't exist or has changed - upload it
+ String backedFileName = UUID.randomUUID().toString();
+ backupRepo.copyIndexFileFrom(dir, fileNameFinal, indexDir,
backedFileName);
+
+ synchronized (currentBackupPoint) {
+ currentBackupPoint.addBackedFile(backedFileName,
fileNameFinal, originalFileCS);
+ }
+ backupStats.uploadedFile(originalFileCS);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to process file: " +
fileNameFinal, e);
+ }
+ };
+
+ uploadFutures.add(executor.submit(uploadTask));
}
- String backedFileName = UUID.randomUUID().toString();
- backupRepo.copyIndexFileFrom(dir, fileName, indexDir, backedFileName);
+ // Wait for ALL futures before throwing - currentBackupPoint must
reflect every
+ // successfully uploaded file before it is written, even when an error
occurs.
+ Throwable firstError = null;
+ for (Future<?> future : uploadFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (firstError == null) {
+ firstError = e.getCause();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (firstError == null) {
+ firstError = e;
+ }
+ }
+ }
- currentBackupPoint.addBackedFile(backedFileName, fileName,
originalFileCS);
- backupStats.uploadedFile(originalFileCS);
+ if (firstError != null) {
+ if (firstError instanceof Error) {
+ // Rethrow Errors (like OutOfMemoryError) - don't try to recover
+ throw (Error) firstError;
+ } else if (firstError instanceof IOException) {
+ throw (IOException) firstError;
+ } else if (firstError instanceof RuntimeException) {
+ throw (RuntimeException) firstError;
+ } else if (firstError instanceof InterruptedException) {
+ throw new IOException("Backup interrupted", firstError);
+ } else {
+ throw new IOException("Error during parallel backup upload",
firstError);
Review Comment:
You threw a `SolrException` in RestoreCore but throw `IOException` here.
##########
solr/core/src/java/org/apache/solr/handler/RestoreCore.java:
##########
@@ -107,34 +125,140 @@ public boolean doRestore() throws Exception {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new
HashSet<>(Arrays.asList(indexDir.listAll()));
- // Move all files from backupDir to restoreIndexDir
- for (String filename : repository.listAllFiles()) {
- checkInterrupted();
- try {
- if (indexDirFiles.contains(filename)) {
- Checksum cs = repository.checksum(filename);
- IndexFetcher.CompareResult compareResult;
- if (cs == null) {
- compareResult = new IndexFetcher.CompareResult();
- compareResult.equal = false;
- } else {
- compareResult = IndexFetcher.compareFile(indexDir, filename,
cs.size, cs.checksum);
+
+ // Capture directories as final for lambda access
+ final Directory finalIndexDir = indexDir;
+ final Directory finalRestoreIndexDir = restoreIndexDir;
+
+ // Only use an executor for parallel downloads when parallelism > 1
+ // When set to 1, run synchronously to avoid thread-local state issues
with CallerRunsPolicy
+ int maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
+ ExecutorService executor =
+ maxParallelDownloads > 1
+ ? new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ 0,
+ maxParallelDownloads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new SolrNamedThreadFactory("RestoreCore"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
Review Comment:
Thanks! I see you dropped `CallerRunsPolicy`. I kind of liked it so that the
calling thread can handle some of the backpressure but I can see an argument
for not keeping it as it is harder to control. Also when this gets released,
since we set the default to 1, backup/restore will be slow if someone upgrades
without knowing this change was implemented and the calling thread isn't doing
the backup anymore. @dsmiley wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]