This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 9912a62 CASSANDRASC-56: Create staging directory if it doesn't exists 9912a62 is described below commit 9912a620a0e67d9aa723037aaf5237598a895eb7 Author: Francisco Guerrero <francisco.guerr...@apple.com> AuthorDate: Mon Jun 19 16:41:10 2023 -0700 CASSANDRASC-56: Create staging directory if it doesn't exists During SSTable upload, the upload will fail if the configured staging directory does not exist. When this occurs an operator must manually create the directory, which increases the configuration toil. In this commit, we automatically create the staging directory if it doesn't exists during SSTable upload. This improves the overall operational experience when running the Sidecar. patch by Francisco Guerrero; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRASC-56 --- CHANGES.txt | 1 + .../sstableuploads/SSTableCleanupHandler.java | 2 +- .../sstableuploads/SSTableImportHandler.java | 52 ++++++++++++---------- .../sstableuploads/SSTableUploadHandler.java | 18 ++++---- .../cassandra/sidecar/utils/BaseFileSystem.java | 11 +++++ .../cassandra/sidecar/utils/SSTableImporter.java | 2 +- .../sidecar/utils/SSTableUploadsPathBuilder.java | 23 +++++++--- .../sstableuploads/SSTableImportHandlerTest.java | 13 ++++-- .../sstableuploads/SSTableUploadHandlerTest.java | 17 ------- .../sidecar/utils/SSTableImporterTest.java | 2 +- 10 files changed, 77 insertions(+), 64 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 86f8bf0..7131a8b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Create staging directory if it doesn't exists (CASSANDRASC-56) * Remove RESTEasy (CASSANDRASC-57) * Use in-jvm dtest framework for integration tests (CASSANDRASC-51) * Sidecar returns own version in node settings (CASSANDRASC-52) diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java index 849fd3d..bffb9a5 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java @@ -66,7 +66,7 @@ public class SSTableCleanupHandler extends AbstractHandler<String> SocketAddress remoteAddress, String uploadId) { - uploadPathBuilder.resolveStagingDirectory(host, uploadId) + uploadPathBuilder.resolveUploadIdDirectory(host, uploadId) .compose(uploadPathBuilder::isValidDirectory) .compose(stagingDirectory -> context.vertx() .fileSystem() diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java index c5d5695..75f8049 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java @@ -109,7 +109,7 @@ public class SSTableImportHandler extends AbstractHandler<SSTableImportRequest> } else if (importResult.failed()) { - context.fail(importResult.cause()); + processFailure(importResult.cause(), context, host, remoteAddress, request); } else { @@ -121,29 +121,33 @@ public class SSTableImportHandler extends AbstractHandler<SSTableImportRequest> request, remoteAddress, host); } }) - .onFailure(cause -> { - if (cause instanceof NoSuchFileException) - { - logger.error("Upload directory not found for request={}, remoteAddress={}, " + - "instance={}", request, remoteAddress, host, cause); - context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, cause.getMessage())); - } - else if (cause instanceof IllegalArgumentException) - { - context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, cause.getMessage(), - cause)); - } - else if (cause instanceof HttpException) - { - context.fail(cause); - } - else - { - logger.error("Unexpected error during import SSTables for request={}, " + - "remoteAddress={}, instance={}", request, remoteAddress, host, cause); - context.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); - } - }); + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)); + } + + @Override + protected void processFailure(Throwable cause, + RoutingContext context, + String host, + SocketAddress remoteAddress, + SSTableImportRequest request) + { + if (cause instanceof NoSuchFileException) + { + logger.error("Upload directory not found for request={}, remoteAddress={}, " + + "instance={}", request, remoteAddress, host, cause); + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, cause.getMessage())); + } + else if (cause instanceof IllegalArgumentException) + { + context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, cause.getMessage(), + cause)); + } + else if (cause instanceof HttpException) + { + context.fail(cause); + } + + super.processFailure(cause, context, host, remoteAddress, request); } @Override diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java index 7952d5d..99babe1 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java @@ -32,7 +32,6 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; import org.apache.cassandra.sidecar.Configuration; -import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.common.data.SSTableUploadResponse; import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator; @@ -103,8 +102,6 @@ public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequest> // accept the upload. httpRequest.pause(); - InstanceMetadata instanceMetadata = metadataFetcher.instance(host); - long startTimeInNanos = System.nanoTime(); if (!limiter.tryAcquire()) { @@ -116,7 +113,8 @@ public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequest> context.addEndHandler(v -> limiter.releasePermit()); validateKeyspaceAndTable(host, request) - .compose(validRequest -> ensureSufficientSpaceAvailable(instanceMetadata)) + .compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host)) + .compose(this::ensureSufficientSpaceAvailable) .compose(v -> uploadPathBuilder.build(host, request)) .compose(uploadDirectory -> uploader.uploadComponent(httpRequest, uploadDirectory, request.component(), request.expectedChecksum())) @@ -187,17 +185,17 @@ public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequest> * Ensures there is sufficient space available as per configured in the * {@link Configuration#getMinSpacePercentRequiredForUpload()}. * - * @param instanceMetadata instance meta data + * @param uploadDirectory the directory where the SSTables are uploaded * @return a succeeded future if there is sufficient space available, or failed future otherwise */ - private Future<Void> ensureSufficientSpaceAvailable(InstanceMetadata instanceMetadata) + private Future<String> ensureSufficientSpaceAvailable(String uploadDirectory) { float minimumPercentageRequired = configuration.getMinSpacePercentRequiredForUpload(); if (minimumPercentageRequired == 0) { - return Future.succeededFuture(); + return Future.succeededFuture(uploadDirectory); } - return fs.fsProps(instanceMetadata.stagingDir()) + return fs.fsProps(uploadDirectory) .compose(fsProps -> { // calculate available disk space percentage long totalSpace = fsProps.totalSpace(); @@ -213,12 +211,12 @@ public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequest> if (availableDiskSpacePercentage < minimumPercentageRequired) { logger.warn("Insufficient space available for upload in stagingDir={}, available={}%, " + - "required={}%", instanceMetadata.stagingDir(), + "required={}%", uploadDirectory, availableDiskSpacePercentage, minimumPercentageRequired); return Future.failedFuture(wrapHttpException(HttpResponseStatus.INSUFFICIENT_STORAGE, "Insufficient space available for upload")); } - return Future.succeededFuture(); + return Future.succeededFuture(uploadDirectory); }); } } diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java b/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java index f7f7c56..fc0b532 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java @@ -96,6 +96,17 @@ public class BaseFileSystem return isValidOfType(path, FileProps::isDirectory); } + /** + * Creates the directory if it doesn't exist, and then validates that {@code path} is a valid directory. + * + * @param path the path to the directory + * @return a future of the validated {@code path}, a failed future otherwise + */ + public Future<String> ensureDirectoryExists(String path) + { + return fs.mkdirs(path).compose(v -> Future.succeededFuture(path)); + } + /** * @param filename the path * @param predicate a predicate that evaluates based on {@link FileProps} diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java index af5f5dc..b6f502c 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java @@ -252,7 +252,7 @@ public class SSTableImporter */ private void cleanup(ImportOptions options) { - uploadPathBuilder.resolveStagingDirectory(options.host, options.uploadId) + uploadPathBuilder.resolveUploadIdDirectory(options.host, options.uploadId) .compose(uploadPathBuilder::isValidDirectory) .compose(stagingDirectory -> vertx.fileSystem() .deleteRecursive(stagingDirectory, true)) diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java index cbf0bc3..9a26a54 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java @@ -74,11 +74,24 @@ public class SSTableUploadsPathBuilder extends BaseFileSystem public <T extends SSTableUploads> Future<String> build(String host, T request) { return validate(request) - .compose(validRequest -> resolveStagingDirectory(host, request.uploadId())) + .compose(validRequest -> resolveUploadIdDirectory(host, request.uploadId())) .compose(stagingDirectory -> resolveUploadDirectory(stagingDirectory, request.keyspace(), request.tableName())); } + /** + * Builds the path to the configured staging directory for the given {@code host}. Attempt to create the + * staging directory if it doesn't exist. + * + * @param host the name of the host + * @return a future to the created and validated staging directory + */ + public Future<String> resolveStagingDirectory(String host) + { + InstanceMetadata instanceMeta = instancesConfig.instanceFromHost(host); + return ensureDirectoryExists(StringUtils.removeEnd(instanceMeta.stagingDir(), File.separator)); + } + /** * Builds the path to the {@code uploadId} staging directory inside the specified {@code host}. * @@ -86,13 +99,11 @@ public class SSTableUploadsPathBuilder extends BaseFileSystem * @param uploadId an identifier for the upload ID * @return the absolute path of the {@code uploadId} staging directory */ - public Future<String> resolveStagingDirectory(String host, String uploadId) + public Future<String> resolveUploadIdDirectory(String host, String uploadId) { return validateUploadId(uploadId) - .compose(validUploadId -> { - InstanceMetadata instanceMeta = instancesConfig.instanceFromHost(host); - return isValidDirectory(StringUtils.removeEnd(instanceMeta.stagingDir(), File.separator)); - }) + .compose(validUploadId -> resolveStagingDirectory(host)) + .compose(this::isValidDirectory) .compose(directory -> Future.succeededFuture(directory + File.separatorChar + uploadId)); } diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java index 5b0a51c..e73fa53 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java @@ -103,12 +103,17 @@ public class SSTableImportHandlerTest extends BaseUploadsHandlerTest } @Test - void testNonExistentUploadDirectory(VertxTestContext context) + void testNonExistentUploadDirectory(VertxTestContext context) throws InterruptedException { UUID uploadId = UUID.randomUUID(); - client.put(config.getPort(), "localhost", "/api/v1/uploads/" + uploadId + "/keyspaces/ks/tables/table/import") - .expect(ResponsePredicate.SC_NOT_FOUND) - .send(context.succeedingThenComplete()); + + TableOperations mockCFOperations = mock(TableOperations.class); + when(mockDelegate.tableOperations()).thenReturn(mockCFOperations); + + String requestURI = "/api/v1/uploads/" + uploadId + "/keyspaces/ks/tables/table/import"; + clientRequest(context, requestURI, + response -> assertThat(response.statusCode()) + .isEqualTo(HttpResponseStatus.NOT_FOUND.code())); } @Test diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java index 6888f3d..e29f055 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java @@ -57,7 +57,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest void testUploadWithoutMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false); } @@ -66,7 +65,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest void testUploadWithCorrectMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-md5.db", "jXd/OF09/siBXSD3SWAm3A==", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false); } @@ -75,7 +73,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-md5.db", "incorrectMd5", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(), false); @@ -85,7 +82,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest void testInvalidFileName_expectErrorCode(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "ks$tbl-me-4-big-Data.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(), false); @@ -95,7 +91,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest void testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-content-length.db", "jXd/OF09/siBXSD3SWAm3A==", 0, HttpResponseStatus.OK.code(), false); } @@ -106,7 +101,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest // if we send more than actual length, vertx goes hung, probably looking for more data than exists in the file, // we should see timeout error in this case UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-higher-content-length.db", "", 1000, -1, true); } @@ -114,7 +108,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest void testUploadWithLesserContentLength_expectSuccessfulUpload(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-lesser-content-length.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(), false); @@ -124,7 +117,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest public void testInvalidKeyspace(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", "tbl", "with-lesser-content-length.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(), false); @@ -134,7 +126,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest public void testInvalidTable(VertxTestContext context) throws IOException { UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "invalidTableName", "with-lesser-content-length.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(), false); @@ -146,7 +137,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest when(mockConfiguration.getMinSpacePercentRequiredForUpload()).thenReturn(100F); UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.INSUFFICIENT_STORAGE.code(), false); @@ -158,7 +148,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest when(mockConfiguration.getConcurrentUploadsLimit()).thenReturn(0); UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.TOO_MANY_REQUESTS.code(), false); @@ -170,7 +159,6 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest when(mockConfiguration.getConcurrentUploadsLimit()).thenReturn(1); UUID uploadId = UUID.randomUUID(); - ensureStagingDirectoryExists(); CountDownLatch latch = new CountDownLatch(1); sendUploadRequestAndVerify(latch, context, uploadId, "invalidKeyspace", "tbl", "without-md5.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(), @@ -251,9 +239,4 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest client.close(); }); } - - private void ensureStagingDirectoryExists() throws IOException - { - Files.createDirectories(Paths.get(SnapshotUtils.makeStagingDir(temporaryFolder.getAbsolutePath()))); - } } diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java index eb716a6..a20ca22 100644 --- a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java @@ -92,7 +92,7 @@ class SSTableImporterTest // get NullPointerExceptions because the mock is not wired up, and we need to prevent vertx from actually // doing a vertx.filesystem().deleteRecursive(). So we return a failed future with a fake path when checking // if the directory exists. - when(mockUploadPathBuilder.resolveStagingDirectory(anyString(), anyString())) + when(mockUploadPathBuilder.resolveUploadIdDirectory(anyString(), anyString())) .thenReturn(Future.failedFuture("fake-path")); when(mockUploadPathBuilder.isValidDirectory("fake-path")).thenReturn(Future.failedFuture("skip cleanup")); importer = new SSTableImporter(vertx, mockMetadataFetcher, mockConfiguration, executorPools, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org