This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new ca8dd2e CASSANDRASC-64: File descriptor is not being closed on MD5 checksum ca8dd2e is described below commit ca8dd2e0381a19ede99ce4b70959ce7a584ea0d2 Author: Francisco Guerrero <francisco.guerr...@apple.com> AuthorDate: Tue Jul 11 04:30:59 2023 -0700 CASSANDRASC-64: File descriptor is not being closed on MD5 checksum This commit fixes an issue where the file descriptors are not being closed during the MD5 checksum during SSTable upload. This can potentially cause the JVM to run out of available file descriptors during execution, and it would force an operator to restart the Sidecar process to return to a working state. In this commit, we ensure the file is closed after the checksum is calculated. Additionally, this commit fixes a rare ConcurrentModificationException encountered in `SSTableImporter` whree the `importQueuePerHost` does not use a thread-safe map. patch by Francisco Guerrero; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRASC-64 --- CHANGES.txt | 1 + .../sstableuploads/SSTableUploadHandler.java | 14 +++ .../sidecar/utils/MD5ChecksumVerifier.java | 29 +++-- .../cassandra/sidecar/utils/SSTableImporter.java | 4 +- .../cassandra/sidecar/utils/SSTableUploader.java | 3 +- .../sstableuploads/SSTableUploadHandlerTest.java | 66 ++++++++-- .../sidecar/utils/MD5ChecksumVerifierTest.java | 133 +++++++++++++++++++++ 7 files changed, 226 insertions(+), 24 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 15ac7f7..5dc930b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * File descriptor is not being closed on MD5 checksum (CASSANDRASC-64) * Expose JMX host and port from JMXClient (CASSANDRASC-59) * Support retries in Sidecar Client on Invalid Checksum (CASSANDRASC-58) * Ignore unknown properties during Sidecar client deserialization (CASSANDRASC-53) diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java index 99babe1..5497b4d 100644 --- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java +++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java @@ -129,6 +129,20 @@ public class SSTableUploadHandler extends AbstractHandler<SSTableUploadRequest> .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)); } + @Override + protected void processFailure(Throwable cause, RoutingContext context, String host, SocketAddress remoteAddress, + SSTableUploadRequest request) + { + if (cause instanceof IllegalArgumentException) + { + context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, cause.getMessage(), cause)); + } + else + { + super.processFailure(cause, context, host, remoteAddress, request); + } + } + /** * {@inheritDoc} */ diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java index 378b1ab..132ff10 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java @@ -31,6 +31,7 @@ import io.vertx.core.file.AsyncFile; import io.vertx.core.file.FileSystem; import io.vertx.core.file.OpenOptions; import io.vertx.ext.web.handler.HttpException; +import org.jetbrains.annotations.VisibleForTesting; import static org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH; @@ -56,26 +57,27 @@ public class MD5ChecksumVerifier implements ChecksumVerifier return Future.succeededFuture(filePath); } - LOGGER.debug("Validating MD5. Expected to match: {}", expectedChecksum); + LOGGER.debug("Validating MD5. expected_checksum={}", expectedChecksum); return fs.open(filePath, new OpenOptions()) .compose(this::calculateMD5) .compose(computedChecksum -> { if (!expectedChecksum.equals(computedChecksum)) + { + LOGGER.error("Checksum mismatch. computed_checksum={}, expected_checksum={}, algorithm=MD5", + computedChecksum, expectedChecksum); return Future.failedFuture(new HttpException(CHECKSUM_MISMATCH.code(), String.format("Checksum mismatch. " - + "computed_checksum=%s, " + "expected_checksum=%s, " + "algorithm=MD5", - computedChecksum, expectedChecksum))); - LOGGER.debug("Checksum mismatch. computed_checksum={}, expected_checksum={}, algorithm=MD5", - computedChecksum, expectedChecksum); + } return Future.succeededFuture(filePath); }); } - private Future<String> calculateMD5(AsyncFile file) + @VisibleForTesting + Future<String> calculateMD5(AsyncFile file) { MessageDigest digest; try @@ -91,12 +93,15 @@ public class MD5ChecksumVerifier implements ChecksumVerifier file.pause() .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE) .handler(buf -> digest.update(buf.getBytes())) - .endHandler(_v -> result.complete(Base64.getEncoder().encodeToString(digest.digest()))) - .exceptionHandler(cause -> - { - LOGGER.error("Error while calculating MD5 checksum", cause); - result.fail(cause); - }) + .endHandler(_v -> { + result.complete(Base64.getEncoder().encodeToString(digest.digest())); + file.end(); + }) + .exceptionHandler(cause -> { + LOGGER.error("Error while calculating MD5 checksum", cause); + result.fail(cause); + file.end(); + }) .resume(); return result.future(); } diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java index b6f502c..b962de0 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java @@ -18,10 +18,10 @@ package org.apache.cassandra.sidecar.utils; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,7 +86,7 @@ public class SSTableImporter this.executorPools = executorPools; this.metadataFetcher = metadataFetcher; this.uploadPathBuilder = uploadPathBuilder; - this.importQueuePerHost = new HashMap<>(); + this.importQueuePerHost = new ConcurrentHashMap<>(); executorPools.internal() .setPeriodic(configuration.getSSTableImportPollIntervalMillis(), this::processPendingImports); } diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java index d65fa36..6a5b271 100644 --- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java +++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java @@ -88,7 +88,8 @@ public class SSTableUploader { // pipe read stream to temp file return streamToFile(readStream, tempFilePath) - .compose(v -> checksumVerifier.verify(expectedChecksum, tempFilePath)); + .compose(v -> checksumVerifier.verify(expectedChecksum, tempFilePath)) + .onFailure(throwable -> fs.delete(tempFilePath)); } private Future<Void> streamToFile(ReadStream<Buffer> readStream, String tempFilename) diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java index 6ed3d2c..f4a5767 100644 --- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java @@ -25,6 +25,7 @@ import java.nio.file.Paths; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -34,7 +35,9 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; import io.vertx.core.file.OpenOptions; +import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; @@ -115,6 +118,19 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest false); } + @Test + public void testInvalidUploadId(VertxTestContext context) throws IOException + { + sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", "with-lesser-content-length.db", "", + Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(), + false, response -> { + JsonObject error = response.bodyAsJsonObject(); + assertThat(error.getString("status")).isEqualTo("Bad Request"); + assertThat(error.getInteger("code")).isEqualTo(400); + assertThat(error.getString("message")).isEqualTo("Invalid upload id is supplied, uploadId=foo"); + }); + } + @Test public void testInvalidKeyspace(VertxTestContext context) throws IOException { @@ -162,9 +178,9 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest UUID uploadId = UUID.randomUUID(); CountDownLatch latch = new CountDownLatch(1); - sendUploadRequestAndVerify(latch, context, uploadId, "invalidKeyspace", "tbl", "without-md5.db", "", - Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(), - false); + sendUploadRequestAndVerify(latch, context, uploadId.toString(), "invalidKeyspace", "tbl", + "without-md5.db", "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)), + HttpResponseStatus.BAD_REQUEST.code(), false); assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); @@ -183,13 +199,13 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest int expectedRetCode, boolean expectTimeout) { - sendUploadRequestAndVerify(null, context, uploadId, keyspace, tableName, targetFileName, expectedMd5, - fileLength, expectedRetCode, expectTimeout); + sendUploadRequestAndVerify(null, context, uploadId.toString(), keyspace, tableName, targetFileName, + expectedMd5, fileLength, expectedRetCode, expectTimeout); } private void sendUploadRequestAndVerify(CountDownLatch latch, VertxTestContext context, - UUID uploadId, + String uploadId, String keyspace, String tableName, String targetFileName, @@ -197,9 +213,34 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest long fileLength, int expectedRetCode, boolean expectTimeout) + { + sendUploadRequestAndVerify(latch, + context, + uploadId, + keyspace, + tableName, + targetFileName, + expectedMd5, + fileLength, + expectedRetCode, + expectTimeout, + null); + } + + private void sendUploadRequestAndVerify(CountDownLatch latch, + VertxTestContext context, + String uploadId, + String keyspace, + String tableName, + String targetFileName, + String expectedMd5, + long fileLength, + int expectedRetCode, + boolean expectTimeout, + Consumer<HttpResponse<Buffer>> responseValidator) { WebClient client = WebClient.create(vertx); - String testRoute = "/api/v1/uploads/" + uploadId.toString() + "/keyspaces/" + keyspace + String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + keyspace + "/tables/" + tableName + "/components/" + targetFileName; HttpRequest<Buffer> req = client.put(config.getPort(), "localhost", testRoute); if (!expectedMd5.isEmpty()) @@ -222,11 +263,18 @@ public class SSTableUploadHandlerTest extends BaseUploadsHandlerTest return; } - assertThat(response.result().statusCode()).isEqualTo(expectedRetCode); + HttpResponse<Buffer> httpResponse = response.result(); + assertThat(httpResponse.statusCode()).isEqualTo(expectedRetCode); + + if (responseValidator != null) + { + responseValidator.accept(httpResponse); + } + if (expectedRetCode == HttpResponseStatus.OK.code()) { Path targetFilePath = Paths.get(SnapshotUtils.makeStagingDir(temporaryFolder.getAbsolutePath()), - uploadId.toString(), keyspace, tableName, targetFileName); + uploadId, keyspace, tableName, targetFileName); assertThat(Files.exists(targetFilePath)).isTrue(); } diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java new file mode 100644 index 0000000..8eb5680 --- /dev/null +++ b/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.FileSystem; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit tests for {@link MD5ChecksumVerifier} + */ +class MD5ChecksumVerifierTest +{ + static Vertx vertx; + static ExposeAsyncFileMD5ChecksumVerifier verifier; + + @TempDir + Path tempDir; + + @BeforeAll + static void setup() + { + vertx = Vertx.vertx(); + verifier = new ExposeAsyncFileMD5ChecksumVerifier(vertx.fileSystem()); + } + + @Test + void testFileDescriptorsClosedWithValidChecksum() throws IOException, NoSuchAlgorithmException, + InterruptedException + { + byte[] randomBytes = generateRandomBytes(); + Path randomFilePath = writeBytesToRandomFile(randomBytes); + String expectedChecksum = Base64.getEncoder() + .encodeToString(MessageDigest.getInstance("MD5") + .digest(randomBytes)); + + runTestScenario(randomFilePath, expectedChecksum); + } + + @Test + void testFileDescriptorsClosedWithInvalidChecksum() throws IOException, InterruptedException + { + Path randomFilePath = writeBytesToRandomFile(generateRandomBytes()); + runTestScenario(randomFilePath, "invalid"); + } + + private void runTestScenario(Path filePath, String checksum) throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + verifier.verify(checksum, filePath.toAbsolutePath().toString()) + .onComplete(complete -> latch.countDown()); + + assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue(); + + assertThat(verifier.file).isNotNull(); + // we can't close the file if it's already closed, so we expect the exception here + assertThatThrownBy(() -> verifier.file.end()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("File handle is closed"); + } + + private byte[] generateRandomBytes() + { + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + } + + private Path writeBytesToRandomFile(byte[] bytes) throws IOException + { + Path tempPath = tempDir.resolve("random-file.txt"); + try (RandomAccessFile writer = new RandomAccessFile(tempPath.toFile(), "rw")) + { + writer.write(bytes); + } + return tempPath; + } + + /** + * Class that extends from {@link MD5ChecksumVerifier} for testing purposes and holds a reference to the + * {@link AsyncFile} to ensure that the file has been closed. + */ + static class ExposeAsyncFileMD5ChecksumVerifier extends MD5ChecksumVerifier + { + AsyncFile file; + + public ExposeAsyncFileMD5ChecksumVerifier(FileSystem fs) + { + super(fs); + } + + @Override + Future<String> calculateMD5(AsyncFile file) + { + this.file = file; + return super.calculateMD5(file); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org