This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new f848cd06 CASSANDRASC-107: Improve logging for slice restore task (#108) f848cd06 is described below commit f848cd063e5e1671c84807615f5eae809253971d Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Thu Mar 21 15:26:06 2024 -0700 CASSANDRASC-107: Improve logging for slice restore task (#108) Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-107 --- CHANGES.txt | 3 +- .../cassandra/sidecar/restore/RestoreJobUtil.java | 8 +- .../sidecar/restore/RestoreSliceTask.java | 229 ++++++++++----------- .../cassandra/sidecar/restore/StorageClient.java | 40 +++- 4 files changed, 143 insertions(+), 137 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index ce39b66c..638ee316 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Improve logging for slice restore task (CASSANDRASC-107) * Add restore task watcher to report long running tasks (CASSANDRASC-106) * RestoreSliceTask could be stuck due to missing exception handling (CASSANDRASC-105) * Make hash algorithm implementation pluggable (CASSANDRASC-114) @@ -82,4 +83,4 @@ * Add integration tests task (CASSANDRA-15031) * Add support for SSL and bindable address (CASSANDRA-15030) * Autogenerate API docs for sidecar (CASSANDRA-15028) - * C* Management process (CASSANDRA-14395) \ No newline at end of file + * C* Management process (CASSANDRA-14395) diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java index 8651d535..be488ae4 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobUtil.java @@ -78,9 +78,9 @@ public class RestoreJobUtil { try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(zipFile.toPath()))) { - ZipEntry zipEntry = zis.getNextEntry(); + ZipEntry zipEntry; - while (zipEntry != null) + while ((zipEntry = zis.getNextEntry()) != null) { // Encounters a directory inside the zip file // It is not expected. The zip file should have the directory depth of 1. @@ -92,8 +92,6 @@ public class RestoreJobUtil File targetFile = newProtectedTargetFile(zipEntry, targetDir); Files.copy(zis, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - - zipEntry = zis.getNextEntry(); } zis.closeEntry(); } @@ -161,7 +159,7 @@ public class RestoreJobUtil } catch (IOException e) { - LOGGER.error("Unexpected error occurred while cleaning directory {}, ", path, e); + LOGGER.error("Unexpected error occurred while cleaning directory {}", path, e); throw new RuntimeException(e); } }); diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java index ed85d3f9..8c828d89 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreSliceTask.java @@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.restore; import java.io.File; import java.nio.file.Files; import java.util.Map; -import java.util.concurrent.CompletableFuture; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -48,6 +47,7 @@ import software.amazon.awssdk.core.exception.ApiCallTimeoutException; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.S3Exception; +import static io.vertx.core.Future.fromCompletionStage; import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSufficientStorage; /** @@ -55,7 +55,7 @@ import static org.apache.cassandra.sidecar.utils.AsyncFileSystemUtils.ensureSuff * and imports SSTables into Cassandra. * It the execution ever fails, the cause should only be * {@link org.apache.cassandra.sidecar.exceptions.RestoreJobException} - * + * <p> * Note that the class is package private, and it is not intended to be referenced by other packages. */ public class RestoreSliceTask implements RestoreSliceHandler @@ -130,25 +130,17 @@ public class RestoreSliceTask implements RestoreSliceHandler } // 1. check object existence and validate eTag / checksum - CompletableFuture<Void> fut = checkObjectExistence(event) - // 2. download slice/object when the remote object exists - .thenCompose(headObject -> downloadSlice(event)) - // 3. persist status - .thenAccept(file -> { - slice.completeStagePhase(); - sliceDatabaseAccessor.updateStatus(slice); - // completed staging. A new task is produced when it comes to import - event.tryComplete(slice); - }) - .whenComplete((x, cause) -> { - if (cause != null) - { - // handle unexpected errors thrown during download slice call, that do not close event - event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause)); - } - }); - - return Future.fromCompletionStage(fut); + return checkObjectExistence(event) + .compose(headObject -> downloadSlice(event)) + .<Void>compose(file -> { + slice.completeStagePhase(); + sliceDatabaseAccessor.updateStatus(slice); + return Future.succeededFuture(); + }) + // completed staging. A new task is produced when it comes to import + .onSuccess(_v -> event.tryComplete(slice)) + // handle unexpected errors thrown during download slice call, that do not close event + .onFailure(cause -> event.tryFail(RestoreJobExceptions.ofSlice(cause.getMessage(), slice, cause))); } else if (job.status == RestoreJobStatus.STAGED) { @@ -177,75 +169,73 @@ public class RestoreSliceTask implements RestoreSliceHandler private Future<Void> downloadSliceAndImport(Promise<RestoreSlice> event) { // 1. check object existence and validate eTag / checksum - CompletableFuture<File> fut = checkObjectExistence(event) - // 2. download slice/object when the remote object exists - .thenCompose(headObject -> downloadSlice(event)); - // 3. unzip the file and import/commit - return Future.fromCompletionStage(fut) - .compose(file -> unzipAndImport(event, file)); + return checkObjectExistence(event) + // 2. download slice/object when the remote object exists + .compose(headObject -> downloadSlice(event)) + // 3. unzip the file and import/commit + .compose(file -> unzipAndImport(event, file)); } - private CompletableFuture<?> checkObjectExistence(Promise<RestoreSlice> event) + private Future<?> checkObjectExistence(Promise<RestoreSlice> event) { // skip query s3 if the object existence is already confirmed if (slice.existsOnS3()) { - return CompletableFuture.completedFuture(null); + LOGGER.debug("The slice already exists on S3. jobId={} sliceKey={}", slice.jobId(), slice.key()); + return Future.succeededFuture(); } - return s3Client - .objectExists(slice) // even if the file already exists on disk, we should still check the object existence - .whenComplete((resp, cause) -> { - if (cause == null) - { - stats.captureSliceReplicationTime(currentTimeInNanos() - slice.creationTimeNanos()); - slice.setExistsOnS3(); - return; - } - + // even if the file already exists on disk, we should still check the object existence + return + fromCompletionStage(s3Client.objectExists(slice)) + .onSuccess(exists -> { + long durationNanos = currentTimeInNanos() - slice.creationTimeNanos(); + stats.captureSliceReplicationTime(durationNanos); + slice.setExistsOnS3(); + LOGGER.debug("Slice is now available on S3. jobId={} sliceKey={} replicationTimeNanos={}", + slice.jobId(), slice.key(), durationNanos); + }) + .onFailure(cause -> { S3Exception s3Exception = ThrowableUtils.getCause(cause, S3Exception.class); if (s3Exception == null) // has non-null cause, but not S3Exception { event.tryFail(RestoreJobExceptions.ofFatalSlice("Unexpected error when checking object existence", slice, cause)); } + else if (s3Exception instanceof NoSuchKeyException) + { + event.tryFail(RestoreJobExceptions.ofSlice("Object not found", slice, null)); + } + else if (s3Exception.statusCode() == 412) + { + // When checksum/eTag does not match, it should be an unrecoverable error and fail immediately. + // For such scenario, we expect "S3Exception: (Status Code: 412)". Also see, + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax + event.tryFail(RestoreJobExceptions.ofFatalSlice("Object checksum mismatched", + slice, s3Exception)); + stats.captureSliceChecksumMismatch(slice.owner().id()); + } + else if (s3Exception.statusCode() == 403) + { + // Fail immediately if 403 forbidden is returned. + // There might be permission issue on accessing the object. + event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access is forbidden", + slice, s3Exception)); + stats.captureTokenUnauthorized(); + } + else if (s3Exception.statusCode() == 400 && + s3Exception.getMessage().contains("token has expired")) + { + // Fail the job if 400, token has expired. + // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList + event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has expired", slice, s3Exception)); + stats.captureTokenExpired(); + } else { - if (s3Exception instanceof NoSuchKeyException) - { - event.tryFail(RestoreJobExceptions.ofSlice("Object not found", slice, null)); - } - else if (s3Exception.statusCode() == 412) - { - // When checksum/eTag does not match, it should be an unrecoverable error and fail immediately. - // For such scenario, we expect "S3Exception: (Status Code: 412)". Also see, - // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax - event.tryFail(RestoreJobExceptions.ofFatalSlice("Object checksum mismatched", - slice, s3Exception)); - stats.captureSliceChecksumMismatch(slice.owner().id()); - } - else if (s3Exception.statusCode() == 403) - { - // Fail immediately if 403 forbidden is returned. - // There might be permission issue on accessing the object. - event.tryFail(RestoreJobExceptions.ofFatalSlice("Object access is forbidden", - slice, s3Exception)); - stats.captureTokenUnauthorized(); - } - else if (s3Exception.statusCode() == 400 && - s3Exception.getMessage().contains("token has expired")) - { - // Fail the job if 400, token has expired. - // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList - event.tryFail(RestoreJobExceptions.ofFatalSlice("Token has expired", slice, s3Exception)); - stats.captureTokenExpired(); - } - else - { - // Retry the other S3Exceptions - event.tryFail(RestoreJobExceptions.ofSlice("Unable to check object existence", - slice, s3Exception)); - } + // Retry the other S3Exceptions + event.tryFail(RestoreJobExceptions.ofSlice("Unable to check object existence", + slice, s3Exception)); } }); } @@ -255,16 +245,14 @@ public class RestoreSliceTask implements RestoreSliceHandler return restoreJobUtil.currentTimeNanos(); } - private CompletableFuture<File> downloadSlice(Promise<RestoreSlice> event) + private Future<File> downloadSlice(Promise<RestoreSlice> event) { if (slice.isCancelled()) { RestoreJobFatalException ex = RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", slice, null); event.tryFail(ex); - CompletableFuture<File> failedFuture = new CompletableFuture<>(); - failedFuture.completeExceptionally(ex); - return failedFuture; + return Future.failedFuture(ex); } if (slice.downloadAttempt() > 0) @@ -274,20 +262,17 @@ public class RestoreSliceTask implements RestoreSliceHandler } LOGGER.info("Begin downloading restore slice. sliceKey={}", slice.key()); - CompletableFuture<File> future = s3Client - .downloadObjectIfAbsent(slice) - .whenComplete((file, cause) -> { - if (cause != null) + Future<File> future = + fromCompletionStage(s3Client.downloadObjectIfAbsent(slice)) + .onFailure(cause -> { + slice.incrementDownloadAttempt(); + if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) != null) { - slice.incrementDownloadAttempt(); - if (ThrowableUtils.getCause(cause, ApiCallTimeoutException.class) != null) - { - LOGGER.warn("Downloading restore slice times out. sliceKey={}", slice.key()); - stats.captureSliceDownloadTimeout(slice.owner().id()); - } - event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable error when downloading object", - slice, cause)); + LOGGER.warn("Downloading restore slice times out. sliceKey={}", slice.key()); + stats.captureSliceDownloadTimeout(slice.owner().id()); } + event.tryFail(RestoreJobExceptions.ofFatalSlice("Unrecoverable error when downloading object", + slice, cause)); }); return Timer.measureTimeTaken(future, duration -> { @@ -315,28 +300,28 @@ public class RestoreSliceTask implements RestoreSliceHandler // run the rest in the executor pool, instead of S3 client threadpool return unzip(file) - .compose(this::validateFiles) - .compose(this::commit) - .compose(x -> { - if (onSuccessCommit == null) - { - return Future.succeededFuture(); - } - - return executorPool.<Void>executeBlocking(promise -> { - onSuccessCommit.run(); - promise.tryComplete(); - }); - }) - .onSuccess(x -> { - slice.completeImportPhase(); - event.tryComplete(slice); - }) - .onFailure(failure -> { - logWarnIfHasHttpExceptionCauseOnCommit(failure, slice); - event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. " - + slice.shortDescription(), failure)); - }); + .compose(this::validateFiles) + .compose(this::commit) + .compose(x -> { + if (onSuccessCommit == null) + { + return Future.succeededFuture(); + } + + return executorPool.<Void>executeBlocking(promise -> { + onSuccessCommit.run(); + promise.tryComplete(); + }); + }) + .onSuccess(x -> { + slice.completeImportPhase(); + event.tryComplete(slice); + }) + .onFailure(failure -> { + logWarnIfHasHttpExceptionCauseOnCommit(failure, slice); + event.tryFail(RestoreJobExceptions.propagate("Fail to commit slice. " + + slice.shortDescription(), failure)); + }); } private Future<File> unzip(File zipFile) @@ -358,7 +343,8 @@ public class RestoreSliceTask implements RestoreSliceHandler { if (targetDirExist) { - LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task?"); + LOGGER.debug("The files in slice are already extracted. Maybe it is a retried task? " + + "jobId={} sliceKey={}", slice.jobId(), slice.key()); promise.complete(targetDir); } else @@ -381,8 +367,8 @@ public class RestoreSliceTask implements RestoreSliceHandler // Then, delete the downloaded zip file if (!zipFile.delete()) { - LOGGER.warn("File deletion attempt failed. file={}", - zipFile.getAbsolutePath()); + LOGGER.warn("File deletion attempt failed. jobId={} sliceKey={} file={}", + slice.jobId(), slice.key(), zipFile.getAbsolutePath()); } } catch (Exception cause) @@ -487,7 +473,7 @@ public class RestoreSliceTask implements RestoreSliceHandler return Future.failedFuture(RestoreJobExceptions.ofFatalSlice("Restore slice is cancelled", slice, null)); - LOGGER.info("Begin committing SSTables. sliceKey={}", slice.key()); + LOGGER.info("Begin committing SSTables. jobId={} sliceKey={}", slice.jobId(), slice.key()); SSTableImportOptions options = slice.job().importOptions; SSTableImporter.ImportOptions importOptions = new SSTableImporter.ImportOptions.Builder() @@ -505,8 +491,8 @@ public class RestoreSliceTask implements RestoreSliceHandler .uploadId(slice.uploadId()) .build(); Future<Void> future = importer.scheduleImport(importOptions) - .onSuccess(ignored -> LOGGER.info("Finish committing SSTables. sliceKey={}", - slice.key())); + .onSuccess(ignored -> LOGGER.info("Finish committing SSTables. jobId={} sliceKey={}", + slice.jobId(), slice.key())); return Timer.measureTimeTaken(future, d -> stats.captureSliceImportTime(slice.owner().id(), d)); } @@ -529,15 +515,16 @@ public class RestoreSliceTask implements RestoreSliceHandler return; } - LOGGER.warn("Committing slice failed with HttpException. slice={} statusCode={} exceptionPayload={}", - slice.sliceId(), httpException.getStatusCode(), httpException.getPayload(), httpException); + LOGGER.warn("Committing slice failed with HttpException. jobId={} sliceKey={} statusCode={} " + + "exceptionPayload={}", slice.jobId(), slice.key(), httpException.getStatusCode(), + httpException.getPayload(), httpException); } @Override public long elapsedInNanos() { return taskStartTimeNanos == -1 ? -1 : - currentTimeInNanos() - taskStartTimeNanos; + currentTimeInNanos() - taskStartTimeNanos; } @Override diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java index 9bf4f0e5..2795b7b5 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java @@ -105,6 +105,8 @@ public class StorageClient /** * Revoke the credentials of a {@link RestoreJob} * It should be called when the job is in a final {@link org.apache.cassandra.sidecar.common.data.RestoreJobStatus} + * + * @param jobId the unique identifier for the job */ public void revokeCredentials(UUID jobId) { @@ -132,7 +134,7 @@ public class StorageClient .build(); return client.headObject(request) - .whenComplete(logCredentialOnRequestFailure(credentials)); + .whenComplete(logCredentialOnRequestFailure(slice, credentials)); } public CompletableFuture<File> downloadObjectIfAbsent(RestoreSlice slice) @@ -140,6 +142,7 @@ public class StorageClient Credentials credentials = credentialsProviders.get(slice.jobId()); if (credentials == null) { + LOGGER.debug("Credentials to download object not found. jobId={}", slice.jobId()); CompletableFuture<File> failedFuture = new CompletableFuture<>(); failedFuture.completeExceptionally(credentialsNotFound(slice)); return failedFuture; @@ -156,6 +159,8 @@ public class StorageClient File object = objectPath.toFile(); if (object.exists()) { + LOGGER.debug("Skipping download, file already exists. jobId={} s3_object={}", + slice.jobId(), slice.stagedObjectPath()); // Skip downloading if the file already exists on disk. It should be a rare scenario. // Note that the on-disk file could be different from the remote object, although the name matches. // TODO 1: verify etag does not change after s3 replication and batch copy @@ -166,10 +171,12 @@ public class StorageClient } if (!object.getParentFile().mkdirs()) { - LOGGER.warn("Error occurred while creating directory for S3 object {}", objectPath); + LOGGER.warn("Error occurred while creating directory. jobId={} s3_object={}", + slice.jobId(), slice.stagedObjectPath()); } - return rateLimitedGetObject(client, request, objectPath) - .whenComplete(logCredentialOnRequestFailure(credentials)) + LOGGER.info("Downloading object. jobId={} s3_object={}", slice.jobId(), slice.stagedObjectPath()); + return rateLimitedGetObject(slice, client, request, objectPath) + .whenComplete(logCredentialOnRequestFailure(slice, credentials)) .thenApply(res -> object); } @@ -206,13 +213,14 @@ public class StorageClient "jobId: " + slice.jobId()); } - private BiConsumer<Object, ? super Throwable> logCredentialOnRequestFailure(Credentials credentials) + private BiConsumer<Object, ? super Throwable> logCredentialOnRequestFailure(RestoreSlice slice, + Credentials credentials) { return (ignored, cause) -> { if (cause != null) { - LOGGER.error("GetObjectRequest is not successful. credentials={}", - credentials.readCredentials, cause); + LOGGER.error("GetObjectRequest is not successful. jobId={} credentials={}", + slice.jobId(), credentials.readCredentials, cause); } }; } @@ -221,17 +229,21 @@ public class StorageClient * Returns a {@link CompletableFuture} to the {@link GetObjectResponse}. It writes the object from S3 to a file * applying rate limiting on the download throughput. * + * @param slice the slice to be restored * @param client the S3 client * @param request the {@link GetObjectRequest request} * @param destinationPath the path where the object will be persisted * @return a {@link CompletableFuture} of the {@link GetObjectResponse} */ - private CompletableFuture<GetObjectResponse> rateLimitedGetObject(S3AsyncClient client, + private CompletableFuture<GetObjectResponse> rateLimitedGetObject(RestoreSlice slice, + S3AsyncClient client, GetObjectRequest request, Path destinationPath) { return client.getObject(request, AsyncResponseTransformer.toPublisher()) - .thenCompose(responsePublisher -> subscribeRateLimitedWrite(destinationPath, responsePublisher)); + .thenCompose(responsePublisher -> subscribeRateLimitedWrite(slice, + destinationPath, + responsePublisher)); } /** @@ -239,11 +251,13 @@ public class StorageClient * by subscribing to the {@code publisher}. Applying backpressure on the received bytes by rate limiting * the download throughput using the {@code downloadRateLimiter} object. * + * @param slice the slice to be restored * @param destinationPath the path where the object will be persisted * @param publisher the {@link ResponsePublisher} * @return a {@link CompletableFuture} to the {@link GetObjectResponse} */ - CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(Path destinationPath, + CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(RestoreSlice slice, + Path destinationPath, ResponsePublisher<GetObjectResponse> publisher) { WritableByteChannel channel; @@ -257,10 +271,14 @@ public class StorageClient } catch (FileAlreadyExistsException fileAlreadyExistsException) { + LOGGER.debug("Skipping download, file already exists. jobId={} s3_object={}", + slice.jobId(), slice.stagedObjectPath()); return CompletableFuture.completedFuture(publisher.response()); } catch (IOException e) { + LOGGER.error("Error occurred while creating channel. destinationPath={} jobId={} s3_object={}", + destinationPath, slice.jobId(), slice.stagedObjectPath(), e); throw new RuntimeException(e); } // CompletableFuture that will be notified when all events have been consumed or if an error occurs. @@ -272,6 +290,8 @@ public class StorageClient } catch (IOException e) { + LOGGER.error("Error occurred while downloading. jobId={} s3_object={}", + slice.jobId(), slice.stagedObjectPath(), e); throw new RuntimeException(e); } }).whenComplete((v, subscribeThrowable) -> closeChannel(channel)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org