This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f848cd06 CASSANDRASC-107: Improve logging for slice restore task (#108)
f848cd06 is described below

commit f848cd063e5e1671c84807615f5eae809253971d
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Thu Mar 21 15:26:06 2024 -0700

    CASSANDRASC-107: Improve logging for slice restore task (#108)
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-107
---
 CHANGES.txt                                        |   3 +-
 .../cassandra/sidecar/restore/RestoreJobUtil.java  |   8 +-
 .../sidecar/restore/RestoreSliceTask.java          | 229 ++++++++++-----------
 .../cassandra/sidecar/restore/StorageClient.java   |  40 +++-
 4 files changed, 143 insertions(+), 137 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ce39b66c..638ee316 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Improve logging for slice restore task (CASSANDRASC-107)
  * Add restore task watcher to report long running tasks (CASSANDRASC-106)
  * RestoreSliceTask could be stuck due to missing exception handling 
(CASSANDRASC-105)
  * Make hash algorithm implementation pluggable (CASSANDRASC-114)
@@ -82,4 +83,4 @@
  * Add integration tests task (CASSANDRA-15031)
  * Add support for SSL and bindable address (CASSANDRA-15030)
  * Autogenerate API docs for sidecar (CASSANDRA-15028)
- * C* Management process (CASSANDRA-14395)
\ No newline at end of file
+ * C* Management process (CASSANDRA-14395)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
index 8651d535..be488ae4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java
@@ -78,9 +78,9 @@ public class RestoreJobUtil
     {
         try (ZipInputStream zis = new 
ZipInputStream(Files.newInputStream(zipFile.toPath())))
         {
-            ZipEntry zipEntry = zis.getNextEntry();
+            ZipEntry zipEntry;
 
-            while (zipEntry != null)
+            while ((zipEntry = zis.getNextEntry()) != null)
             {
                 // Encounters a directory inside the zip file
                 // It is not expected. The zip file should have the directory 
depth of 1.
@@ -92,8 +92,6 @@ public class RestoreJobUtil
 
                 File targetFile = newProtectedTargetFile(zipEntry, targetDir);
                 Files.copy(zis, targetFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
-
-                zipEntry = zis.getNextEntry();
             }
             zis.closeEntry();
         }
@@ -161,7 +159,7 @@ public class RestoreJobUtil
                 }
                 catch (IOException e)
                 {
-                    LOGGER.error("Unexpected error occurred while cleaning 
directory {}, ", path, e);
+                    LOGGER.error("Unexpected error occurred while cleaning 
directory {}", path, e);
                     throw new RuntimeException(e);
                 }
             });
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
index ed85d3f9..8c828d89 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.restore;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -48,6 +47,7 @@ import 
software.amazon.awssdk.core.exception.ApiCallTimeoutException;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 
+import static io.vertx.core.Future.fromCompletionStage;
 import static 
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage;
 
 /**
@@ -55,7 +55,7 @@ import static 
org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSuff
  * and imports SSTables into Cassandra.
  * It the execution ever fails, the cause should only be
  * {@link org.apache.cassandra.sidecar.exceptions.RestoreJobException}
- *
+ * <p>
  * Note that the class is package private, and it is not intended to be 
referenced by other packages.
  */
 public class RestoreSliceTask implements RestoreSliceHandler
@@ -130,25 +130,17 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
                     }
 
                     // 1. check object existence and validate eTag / checksum
-                    CompletableFuture<Void> fut = checkObjectExistence(event)
-                    // 2. download slice/object when the remote object exists
-                    .thenCompose(headObject -> downloadSlice(event))
-                    // 3. persist status
-                    .thenAccept(file -> {
-                        slice.completeStagePhase();
-                        sliceDatabaseAccessor.updateStatus(slice);
-                        // completed staging. A new task is produced when it 
comes to import
-                        event.tryComplete(slice);
-                    })
-                    .whenComplete((x, cause) -> {
-                        if (cause != null)
-                        {
-                            // handle unexpected errors thrown during download 
slice call, that do not close event
-                            
event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause));
-                        }
-                    });
-
-                    return Future.fromCompletionStage(fut);
+                    return checkObjectExistence(event)
+                           .compose(headObject -> downloadSlice(event))
+                           .<Void>compose(file -> {
+                               slice.completeStagePhase();
+                               sliceDatabaseAccessor.updateStatus(slice);
+                               return Future.succeededFuture();
+                           })
+                           // completed staging. A new task is produced when 
it comes to import
+                           .onSuccess(_v -> event.tryComplete(slice))
+                           // handle unexpected errors thrown during download 
slice call, that do not close event
+                           .onFailure(cause -> 
event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)));
                 }
                 else if (job.status == RestoreJobStatus.STAGED)
                 {
@@ -177,75 +169,73 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
     private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event)
     {
         // 1. check object existence and validate eTag / checksum
-        CompletableFuture<File> fut = checkObjectExistence(event)
-        // 2. download slice/object when the remote object exists
-        .thenCompose(headObject -> downloadSlice(event));
-        // 3. unzip the file and import/commit
-        return Future.fromCompletionStage(fut)
-                     .compose(file -> unzipAndImport(event, file));
+        return checkObjectExistence(event)
+               // 2. download slice/object when the remote object exists
+               .compose(headObject -> downloadSlice(event))
+               // 3. unzip the file and import/commit
+               .compose(file -> unzipAndImport(event, file));
     }
 
-    private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> 
event)
+    private Future<?> checkObjectExistence(Promise<RestoreSlice> event)
     {
         // skip query s3 if the object existence is already confirmed
         if (slice.existsOnS3())
         {
-            return CompletableFuture.completedFuture(null);
+            LOGGER.debug("The slice already exists on S3. jobId={} 
sliceKey={}", slice.jobId(), slice.key());
+            return Future.succeededFuture();
         }
 
-        return s3Client
-        .objectExists(slice) // even if the file already exists on disk, we 
should still check the object existence
-        .whenComplete((resp, cause) -> {
-            if (cause == null)
-            {
-                stats.captureSliceReplicationTime(currentTimeInNanos() - 
slice.creationTimeNanos());
-                slice.setExistsOnS3();
-                return;
-            }
-
+        // even if the file already exists on disk, we should still check the 
object existence
+        return
+        fromCompletionStage(s3Client.objectExists(slice))
+        .onSuccess(exists -> {
+            long durationNanos = currentTimeInNanos() - 
slice.creationTimeNanos();
+            stats.captureSliceReplicationTime(durationNanos);
+            slice.setExistsOnS3();
+            LOGGER.debug("Slice is now available on S3. jobId={} sliceKey={} 
replicationTimeNanos={}",
+                         slice.jobId(), slice.key(), durationNanos);
+        })
+        .onFailure(cause -> {
             S3Exception s3Exception = ThrowableUtils.getCause(cause, 
S3Exception.class);
             if (s3Exception == null) // has non-null cause, but not S3Exception
             {
                 event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected 
error when checking object existence",
                                                                 slice, cause));
             }
+            else if (s3Exception instanceof NoSuchKeyException)
+            {
+                event.tryFail(RestoreJobExceptions.ofSlice("Object not found", 
slice, null));
+            }
+            else if (s3Exception.statusCode() == 412)
+            {
+                // When checksum/eTag does not match, it should be an 
unrecoverable error and fail immediately.
+                // For such scenario, we expect "S3Exception: (Status Code: 
412)". Also see,
+                // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
+                event.tryFail(RestoreJobExceptions.ofFatalSlice("Object 
checksum mismatched",
+                                                                slice, 
s3Exception));
+                stats.captureSliceChecksumMismatch(slice.owner().id());
+            }
+            else if (s3Exception.statusCode() == 403)
+            {
+                // Fail immediately if 403 forbidden is returned.
+                // There might be permission issue on accessing the object.
+                event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access 
is forbidden",
+                                                                slice, 
s3Exception));
+                stats.captureTokenUnauthorized();
+            }
+            else if (s3Exception.statusCode() == 400 &&
+                     s3Exception.getMessage().contains("token has expired"))
+            {
+                // Fail the job if 400, token has expired.
+                // 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
+                event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has 
expired", slice, s3Exception));
+                stats.captureTokenExpired();
+            }
             else
             {
-                if (s3Exception instanceof NoSuchKeyException)
-                {
-                    event.tryFail(RestoreJobExceptions.ofSlice("Object not 
found", slice, null));
-                }
-                else if (s3Exception.statusCode() == 412)
-                {
-                    // When checksum/eTag does not match, it should be an 
unrecoverable error and fail immediately.
-                    // For such scenario, we expect "S3Exception: (Status 
Code: 412)". Also see,
-                    // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
-                    event.tryFail(RestoreJobExceptions.ofFatalSlice("Object 
checksum mismatched",
-                                                                    slice, 
s3Exception));
-                    stats.captureSliceChecksumMismatch(slice.owner().id());
-                }
-                else if (s3Exception.statusCode() == 403)
-                {
-                    // Fail immediately if 403 forbidden is returned.
-                    // There might be permission issue on accessing the object.
-                    event.tryFail(RestoreJobExceptions.ofFatalSlice("Object 
access is forbidden",
-                                                                    slice, 
s3Exception));
-                    stats.captureTokenUnauthorized();
-                }
-                else if (s3Exception.statusCode() == 400 &&
-                         s3Exception.getMessage().contains("token has 
expired"))
-                {
-                    // Fail the job if 400, token has expired.
-                    // 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
-                    event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has 
expired", slice, s3Exception));
-                    stats.captureTokenExpired();
-                }
-                else
-                {
-                    // Retry the other S3Exceptions
-                    event.tryFail(RestoreJobExceptions.ofSlice("Unable to 
check object existence",
-                                                               slice, 
s3Exception));
-                }
+                // Retry the other S3Exceptions
+                event.tryFail(RestoreJobExceptions.ofSlice("Unable to check 
object existence",
+                                                           slice, 
s3Exception));
             }
         });
     }
@@ -255,16 +245,14 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
         return restoreJobUtil.currentTimeNanos();
     }
 
-    private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event)
+    private Future<File> downloadSlice(Promise<RestoreSlice> event)
     {
         if (slice.isCancelled())
         {
             RestoreJobFatalException ex = 
RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled",
                                                                             
slice, null);
             event.tryFail(ex);
-            CompletableFuture<File> failedFuture = new CompletableFuture<>();
-            failedFuture.completeExceptionally(ex);
-            return failedFuture;
+            return Future.failedFuture(ex);
         }
 
         if (slice.downloadAttempt() > 0)
@@ -274,20 +262,17 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
         }
 
         LOGGER.info("Begin downloading restore slice. sliceKey={}", 
slice.key());
-        CompletableFuture<File> future = s3Client
-        .downloadObjectIfAbsent(slice)
-        .whenComplete((file, cause) -> {
-            if (cause != null)
+        Future<File> future =
+        fromCompletionStage(s3Client.downloadObjectIfAbsent(slice))
+        .onFailure(cause -> {
+            slice.incrementDownloadAttempt();
+            if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) 
!= null)
             {
-                slice.incrementDownloadAttempt();
-                if (ThrowableUtils.getCause(cause, 
ApiCallTimeoutException.class) != null)
-                {
-                    LOGGER.warn("Downloading restore slice times out. 
sliceKey={}", slice.key());
-                    stats.captureSliceDownloadTimeout(slice.owner().id());
-                }
-                event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable 
error when downloading object",
-                                                                slice, cause));
+                LOGGER.warn("Downloading restore slice times out. 
sliceKey={}", slice.key());
+                stats.captureSliceDownloadTimeout(slice.owner().id());
             }
+            event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable 
error when downloading object",
+                                                            slice, cause));
         });
 
         return Timer.measureTimeTaken(future, duration -> {
@@ -315,28 +300,28 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
 
         // run the rest in the executor pool, instead of S3 client threadpool
         return unzip(file)
-        .compose(this::validateFiles)
-        .compose(this::commit)
-        .compose(x -> {
-            if (onSuccessCommit == null)
-            {
-                return Future.succeededFuture();
-            }
-
-            return executorPool.<Void>executeBlocking(promise -> {
-                onSuccessCommit.run();
-                promise.tryComplete();
-            });
-        })
-        .onSuccess(x -> {
-            slice.completeImportPhase();
-            event.tryComplete(slice);
-        })
-        .onFailure(failure -> {
-            logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
-            event.tryFail(RestoreJobExceptions.propagate("Fail to commit 
slice. "
-                                                         + 
slice.shortDescription(), failure));
-        });
+               .compose(this::validateFiles)
+               .compose(this::commit)
+               .compose(x -> {
+                   if (onSuccessCommit == null)
+                   {
+                       return Future.succeededFuture();
+                   }
+
+                   return executorPool.<Void>executeBlocking(promise -> {
+                       onSuccessCommit.run();
+                       promise.tryComplete();
+                   });
+               })
+               .onSuccess(x -> {
+                   slice.completeImportPhase();
+                   event.tryComplete(slice);
+               })
+               .onFailure(failure -> {
+                   logWarnIfHasHttpExceptionCauseOnCommit(failure, slice);
+                   event.tryFail(RestoreJobExceptions.propagate("Fail to 
commit slice. "
+                                                                + 
slice.shortDescription(), failure));
+               });
     }
 
     private Future<File> unzip(File zipFile)
@@ -358,7 +343,8 @@ public class RestoreSliceTask implements RestoreSliceHandler
             {
                 if (targetDirExist)
                 {
-                    LOGGER.debug("The files in slice are already extracted. 
Maybe it is a retried task?");
+                    LOGGER.debug("The files in slice are already extracted. 
Maybe it is a retried task? " +
+                                 "jobId={} sliceKey={}", slice.jobId(), 
slice.key());
                     promise.complete(targetDir);
                 }
                 else
@@ -381,8 +367,8 @@ public class RestoreSliceTask implements RestoreSliceHandler
                 // Then, delete the downloaded zip file
                 if (!zipFile.delete())
                 {
-                    LOGGER.warn("File deletion attempt failed. file={}",
-                                zipFile.getAbsolutePath());
+                    LOGGER.warn("File deletion attempt failed. jobId={} 
sliceKey={} file={}",
+                                slice.jobId(), slice.key(), 
zipFile.getAbsolutePath());
                 }
             }
             catch (Exception cause)
@@ -487,7 +473,7 @@ public class RestoreSliceTask implements RestoreSliceHandler
             return 
Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is 
cancelled",
                                                                          
slice, null));
 
-        LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key());
+        LOGGER.info("Begin committing SSTables. jobId={} sliceKey={}", 
slice.jobId(), slice.key());
 
         SSTableImportOptions options = slice.job().importOptions;
         SSTableImporter.ImportOptions importOptions = new 
SSTableImporter.ImportOptions.Builder()
@@ -505,8 +491,8 @@ public class RestoreSliceTask implements RestoreSliceHandler
                                                       
.uploadId(slice.uploadId())
                                                       .build();
         Future<Void> future = importer.scheduleImport(importOptions)
-                                      .onSuccess(ignored -> 
LOGGER.info("Finish committing SSTables. sliceKey={}",
-                                                                        
slice.key()));
+                                      .onSuccess(ignored -> 
LOGGER.info("Finish committing SSTables. jobId={} sliceKey={}",
+                                                                        
slice.jobId(), slice.key()));
         return Timer.measureTimeTaken(future, d -> 
stats.captureSliceImportTime(slice.owner().id(), d));
     }
 
@@ -529,15 +515,16 @@ public class RestoreSliceTask implements 
RestoreSliceHandler
             return;
         }
 
-        LOGGER.warn("Committing slice failed with HttpException. slice={} 
statusCode={} exceptionPayload={}",
-                    slice.sliceId(), httpException.getStatusCode(), 
httpException.getPayload(), httpException);
+        LOGGER.warn("Committing slice failed with HttpException. jobId={} 
sliceKey={} statusCode={} " +
+                    "exceptionPayload={}", slice.jobId(), slice.key(), 
httpException.getStatusCode(),
+                    httpException.getPayload(), httpException);
     }
 
     @Override
     public long elapsedInNanos()
     {
         return taskStartTimeNanos == -1 ? -1 :
-        currentTimeInNanos() - taskStartTimeNanos;
+               currentTimeInNanos() - taskStartTimeNanos;
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index 9bf4f0e5..2795b7b5 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -105,6 +105,8 @@ public class StorageClient
     /**
      * Revoke the credentials of a {@link RestoreJob}
      * It should be called when the job is in a final {@link 
org.apache.cassandra.sidecar.common.data.RestoreJobStatus}
+     *
+     * @param jobId the unique identifier for the job
      */
     public void revokeCredentials(UUID jobId)
     {
@@ -132,7 +134,7 @@ public class StorageClient
                          .build();
 
         return client.headObject(request)
-                     .whenComplete(logCredentialOnRequestFailure(credentials));
+                     .whenComplete(logCredentialOnRequestFailure(slice, 
credentials));
     }
 
     public CompletableFuture<File> downloadObjectIfAbsent(RestoreSlice slice)
@@ -140,6 +142,7 @@ public class StorageClient
         Credentials credentials = credentialsProviders.get(slice.jobId());
         if (credentials == null)
         {
+            LOGGER.debug("Credentials to download object not found. jobId={}", 
slice.jobId());
             CompletableFuture<File> failedFuture = new CompletableFuture<>();
             failedFuture.completeExceptionally(credentialsNotFound(slice));
             return failedFuture;
@@ -156,6 +159,8 @@ public class StorageClient
         File object = objectPath.toFile();
         if (object.exists())
         {
+            LOGGER.debug("Skipping download, file already exists. jobId={} 
s3_object={}",
+                         slice.jobId(), slice.stagedObjectPath());
             // Skip downloading if the file already exists on disk. It should 
be a rare scenario.
             // Note that the on-disk file could be different from the remote 
object, although the name matches.
             // TODO 1: verify etag does not change after s3 replication and 
batch copy
@@ -166,10 +171,12 @@ public class StorageClient
         }
         if (!object.getParentFile().mkdirs())
         {
-            LOGGER.warn("Error occurred while creating directory for S3 object 
{}", objectPath);
+            LOGGER.warn("Error occurred while creating directory. jobId={} 
s3_object={}",
+                        slice.jobId(), slice.stagedObjectPath());
         }
-        return rateLimitedGetObject(client, request, objectPath)
-               .whenComplete(logCredentialOnRequestFailure(credentials))
+        LOGGER.info("Downloading object. jobId={} s3_object={}", 
slice.jobId(), slice.stagedObjectPath());
+        return rateLimitedGetObject(slice, client, request, objectPath)
+               .whenComplete(logCredentialOnRequestFailure(slice, credentials))
                .thenApply(res -> object);
     }
 
@@ -206,13 +213,14 @@ public class StorageClient
                                          "jobId: " + slice.jobId());
     }
 
-    private BiConsumer<Object, ? super Throwable> 
logCredentialOnRequestFailure(Credentials credentials)
+    private BiConsumer<Object, ? super Throwable> 
logCredentialOnRequestFailure(RestoreSlice slice,
+                                                                               
 Credentials credentials)
     {
         return (ignored, cause) -> {
             if (cause != null)
             {
-                LOGGER.error("GetObjectRequest is not successful. 
credentials={}",
-                             credentials.readCredentials, cause);
+                LOGGER.error("GetObjectRequest is not successful. jobId={} 
credentials={}",
+                             slice.jobId(), credentials.readCredentials, 
cause);
             }
         };
     }
@@ -221,17 +229,21 @@ public class StorageClient
      * Returns a {@link CompletableFuture} to the {@link GetObjectResponse}. 
It writes the object from S3 to a file
      * applying rate limiting on the download throughput.
      *
+     * @param slice           the slice to be restored
      * @param client          the S3 client
      * @param request         the {@link GetObjectRequest request}
      * @param destinationPath the path where the object will be persisted
      * @return a {@link CompletableFuture} of the {@link GetObjectResponse}
      */
-    private CompletableFuture<GetObjectResponse> 
rateLimitedGetObject(S3AsyncClient client,
+    private CompletableFuture<GetObjectResponse> 
rateLimitedGetObject(RestoreSlice slice,
+                                                                      
S3AsyncClient client,
                                                                       
GetObjectRequest request,
                                                                       Path 
destinationPath)
     {
         return client.getObject(request, 
AsyncResponseTransformer.toPublisher())
-                     .thenCompose(responsePublisher -> 
subscribeRateLimitedWrite(destinationPath, responsePublisher));
+                     .thenCompose(responsePublisher -> 
subscribeRateLimitedWrite(slice,
+                                                                               
  destinationPath,
+                                                                               
  responsePublisher));
     }
 
     /**
@@ -239,11 +251,13 @@ public class StorageClient
      * by subscribing to the {@code publisher}. Applying backpressure on the 
received bytes by rate limiting
      * the download throughput using the {@code downloadRateLimiter} object.
      *
+     * @param slice           the slice to be restored
      * @param destinationPath the path where the object will be persisted
      * @param publisher       the {@link ResponsePublisher}
      * @return a {@link CompletableFuture} to the {@link GetObjectResponse}
      */
-    CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(Path 
destinationPath,
+    CompletableFuture<GetObjectResponse> 
subscribeRateLimitedWrite(RestoreSlice slice,
+                                                                   Path 
destinationPath,
                                                                    
ResponsePublisher<GetObjectResponse> publisher)
     {
         WritableByteChannel channel;
@@ -257,10 +271,14 @@ public class StorageClient
         }
         catch (FileAlreadyExistsException fileAlreadyExistsException)
         {
+            LOGGER.debug("Skipping download, file already exists. jobId={} 
s3_object={}",
+                         slice.jobId(), slice.stagedObjectPath());
             return CompletableFuture.completedFuture(publisher.response());
         }
         catch (IOException e)
         {
+            LOGGER.error("Error occurred while creating channel. 
destinationPath={} jobId={} s3_object={}",
+                         destinationPath, slice.jobId(), 
slice.stagedObjectPath(), e);
             throw new RuntimeException(e);
         }
         // CompletableFuture that will be notified when all events have been 
consumed or if an error occurs.
@@ -272,6 +290,8 @@ public class StorageClient
             }
             catch (IOException e)
             {
+                LOGGER.error("Error occurred while downloading. jobId={} 
s3_object={}",
+                             slice.jobId(), slice.stagedObjectPath(), e);
                 throw new RuntimeException(e);
             }
         }).whenComplete((v, subscribeThrowable) -> closeChannel(channel));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to