This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ca8dd2e  CASSANDRASC-64: File descriptor is not being closed on MD5 
checksum
ca8dd2e is described below

commit ca8dd2e0381a19ede99ce4b70959ce7a584ea0d2
Author: Francisco Guerrero <francisco.guerr...@apple.com>
AuthorDate: Tue Jul 11 04:30:59 2023 -0700

    CASSANDRASC-64: File descriptor is not being closed on MD5 checksum
    
    This commit fixes an issue where the file descriptors are not being closed 
during
    the MD5 checksum during SSTable upload. This can potentially cause the JVM 
to run
    out of available file descriptors during execution, and it would force an 
operator
    to restart the Sidecar process to return to a working state.
    
    In this commit, we ensure the file is closed after the checksum is 
calculated.
    
    Additionally, this commit fixes a rare ConcurrentModificationException 
encountered
    in `SSTableImporter` whree the `importQueuePerHost` does not use a 
thread-safe map.
    
    patch by Francisco Guerrero; reviewed by Dinesh Joshi, Yifan Cai for 
CASSANDRASC-64
---
 CHANGES.txt                                        |   1 +
 .../sstableuploads/SSTableUploadHandler.java       |  14 +++
 .../sidecar/utils/MD5ChecksumVerifier.java         |  29 +++--
 .../cassandra/sidecar/utils/SSTableImporter.java   |   4 +-
 .../cassandra/sidecar/utils/SSTableUploader.java   |   3 +-
 .../sstableuploads/SSTableUploadHandlerTest.java   |  66 ++++++++--
 .../sidecar/utils/MD5ChecksumVerifierTest.java     | 133 +++++++++++++++++++++
 7 files changed, 226 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 15ac7f7..5dc930b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * File descriptor is not being closed on MD5 checksum (CASSANDRASC-64)
  * Expose JMX host and port from JMXClient (CASSANDRASC-59)
  * Support retries in Sidecar Client on Invalid Checksum (CASSANDRASC-58)
  * Ignore unknown properties during Sidecar client deserialization 
(CASSANDRASC-53)
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
 
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index 99babe1..5497b4d 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -129,6 +129,20 @@ public class SSTableUploadHandler extends 
AbstractHandler<SSTableUploadRequest>
         .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request));
     }
 
+    @Override
+    protected void processFailure(Throwable cause, RoutingContext context, 
String host, SocketAddress remoteAddress,
+                                  SSTableUploadRequest request)
+    {
+        if (cause instanceof IllegalArgumentException)
+        {
+            context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
cause.getMessage(), cause));
+        }
+        else
+        {
+            super.processFailure(cause, context, host, remoteAddress, request);
+        }
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
index 378b1ab..132ff10 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java
@@ -31,6 +31,7 @@ import io.vertx.core.file.AsyncFile;
 import io.vertx.core.file.FileSystem;
 import io.vertx.core.file.OpenOptions;
 import io.vertx.ext.web.handler.HttpException;
+import org.jetbrains.annotations.VisibleForTesting;
 
 import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpResponseStatus.CHECKSUM_MISMATCH;
 
@@ -56,26 +57,27 @@ public class MD5ChecksumVerifier implements ChecksumVerifier
             return Future.succeededFuture(filePath);
         }
 
-        LOGGER.debug("Validating MD5. Expected to match: {}", 
expectedChecksum);
+        LOGGER.debug("Validating MD5. expected_checksum={}", expectedChecksum);
 
         return fs.open(filePath, new OpenOptions())
                  .compose(this::calculateMD5)
                  .compose(computedChecksum -> {
                      if (!expectedChecksum.equals(computedChecksum))
+                     {
+                         LOGGER.error("Checksum mismatch. 
computed_checksum={}, expected_checksum={}, algorithm=MD5",
+                                      computedChecksum, expectedChecksum);
                          return Future.failedFuture(new 
HttpException(CHECKSUM_MISMATCH.code(),
                                                                       
String.format("Checksum mismatch. "
-                                                                               
     + "computed_checksum=%s, "
                                                                                
     + "expected_checksum=%s, "
                                                                                
     + "algorithm=MD5",
-                                                                               
     computedChecksum,
                                                                                
     expectedChecksum)));
-                     LOGGER.debug("Checksum mismatch. computed_checksum={}, 
expected_checksum={}, algorithm=MD5",
-                                  computedChecksum, expectedChecksum);
+                     }
                      return Future.succeededFuture(filePath);
                  });
     }
 
-    private Future<String> calculateMD5(AsyncFile file)
+    @VisibleForTesting
+    Future<String> calculateMD5(AsyncFile file)
     {
         MessageDigest digest;
         try
@@ -91,12 +93,15 @@ public class MD5ChecksumVerifier implements ChecksumVerifier
         file.pause()
             .setReadBufferSize(DEFAULT_READ_BUFFER_SIZE)
             .handler(buf -> digest.update(buf.getBytes()))
-            .endHandler(_v -> 
result.complete(Base64.getEncoder().encodeToString(digest.digest())))
-            .exceptionHandler(cause ->
-                              {
-                                  LOGGER.error("Error while calculating MD5 
checksum", cause);
-                                  result.fail(cause);
-                              })
+            .endHandler(_v -> {
+                
result.complete(Base64.getEncoder().encodeToString(digest.digest()));
+                file.end();
+            })
+            .exceptionHandler(cause -> {
+                LOGGER.error("Error while calculating MD5 checksum", cause);
+                result.fail(cause);
+                file.end();
+            })
             .resume();
         return result.future();
     }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index b6f502c..b962de0 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -18,10 +18,10 @@
 
 package org.apache.cassandra.sidecar.utils;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -86,7 +86,7 @@ public class SSTableImporter
         this.executorPools = executorPools;
         this.metadataFetcher = metadataFetcher;
         this.uploadPathBuilder = uploadPathBuilder;
-        this.importQueuePerHost = new HashMap<>();
+        this.importQueuePerHost = new ConcurrentHashMap<>();
         executorPools.internal()
                      
.setPeriodic(configuration.getSSTableImportPollIntervalMillis(), 
this::processPendingImports);
     }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java 
b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
index d65fa36..6a5b271 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
@@ -88,7 +88,8 @@ public class SSTableUploader
     {
         // pipe read stream to temp file
         return streamToFile(readStream, tempFilePath)
-               .compose(v -> checksumVerifier.verify(expectedChecksum, 
tempFilePath));
+               .compose(v -> checksumVerifier.verify(expectedChecksum, 
tempFilePath))
+               .onFailure(throwable -> fs.delete(tempFilePath));
     }
 
     private Future<Void> streamToFile(ReadStream<Buffer> readStream, String 
tempFilename)
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
index 6ed3d2c..f4a5767 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
@@ -25,6 +25,7 @@ import java.nio.file.Paths;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -34,7 +35,9 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.file.AsyncFile;
 import io.vertx.core.file.OpenOptions;
+import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
@@ -115,6 +118,19 @@ public class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                    false);
     }
 
+    @Test
+    public void testInvalidUploadId(VertxTestContext context) throws 
IOException
+    {
+        sendUploadRequestAndVerify(null, context, "foo", "ks", "tbl", 
"with-lesser-content-length.db", "",
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.BAD_REQUEST.code(),
+                                   false, response -> {
+            JsonObject error = response.bodyAsJsonObject();
+            assertThat(error.getString("status")).isEqualTo("Bad Request");
+            assertThat(error.getInteger("code")).isEqualTo(400);
+            assertThat(error.getString("message")).isEqualTo("Invalid upload 
id is supplied, uploadId=foo");
+        });
+    }
+
     @Test
     public void testInvalidKeyspace(VertxTestContext context) throws 
IOException
     {
@@ -162,9 +178,9 @@ public class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
 
         UUID uploadId = UUID.randomUUID();
         CountDownLatch latch = new CountDownLatch(1);
-        sendUploadRequestAndVerify(latch, context, uploadId, 
"invalidKeyspace", "tbl", "without-md5.db", "",
-                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)), 
HttpResponseStatus.BAD_REQUEST.code(),
-                                   false);
+        sendUploadRequestAndVerify(latch, context, uploadId.toString(), 
"invalidKeyspace", "tbl",
+                                   "without-md5.db", "", 
Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.BAD_REQUEST.code(), 
false);
 
         assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
 
@@ -183,13 +199,13 @@ public class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                             int expectedRetCode,
                                             boolean expectTimeout)
     {
-        sendUploadRequestAndVerify(null, context, uploadId, keyspace, 
tableName, targetFileName, expectedMd5,
-                                   fileLength, expectedRetCode, expectTimeout);
+        sendUploadRequestAndVerify(null, context, uploadId.toString(), 
keyspace, tableName, targetFileName,
+                                   expectedMd5, fileLength, expectedRetCode, 
expectTimeout);
     }
 
     private void sendUploadRequestAndVerify(CountDownLatch latch,
                                             VertxTestContext context,
-                                            UUID uploadId,
+                                            String uploadId,
                                             String keyspace,
                                             String tableName,
                                             String targetFileName,
@@ -197,9 +213,34 @@ public class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                             long fileLength,
                                             int expectedRetCode,
                                             boolean expectTimeout)
+    {
+        sendUploadRequestAndVerify(latch,
+                                   context,
+                                   uploadId,
+                                   keyspace,
+                                   tableName,
+                                   targetFileName,
+                                   expectedMd5,
+                                   fileLength,
+                                   expectedRetCode,
+                                   expectTimeout,
+                                   null);
+    }
+
+    private void sendUploadRequestAndVerify(CountDownLatch latch,
+                                            VertxTestContext context,
+                                            String uploadId,
+                                            String keyspace,
+                                            String tableName,
+                                            String targetFileName,
+                                            String expectedMd5,
+                                            long fileLength,
+                                            int expectedRetCode,
+                                            boolean expectTimeout,
+                                            Consumer<HttpResponse<Buffer>> 
responseValidator)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/api/v1/uploads/" + uploadId.toString() + 
"/keyspaces/" + keyspace
+        String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + 
keyspace
                            + "/tables/" + tableName + "/components/" + 
targetFileName;
         HttpRequest<Buffer> req = client.put(config.getPort(), "localhost", 
testRoute);
         if (!expectedMd5.isEmpty())
@@ -222,11 +263,18 @@ public class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                 return;
             }
 
-            
assertThat(response.result().statusCode()).isEqualTo(expectedRetCode);
+            HttpResponse<Buffer> httpResponse = response.result();
+            assertThat(httpResponse.statusCode()).isEqualTo(expectedRetCode);
+
+            if (responseValidator != null)
+            {
+                responseValidator.accept(httpResponse);
+            }
+
             if (expectedRetCode == HttpResponseStatus.OK.code())
             {
                 Path targetFilePath = 
Paths.get(SnapshotUtils.makeStagingDir(temporaryFolder.getAbsolutePath()),
-                                                uploadId.toString(), keyspace, 
tableName, targetFileName);
+                                                uploadId, keyspace, tableName, 
targetFileName);
                 assertThat(Files.exists(targetFilePath)).isTrue();
             }
 
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java 
b/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
new file mode 100644
index 0000000..8eb5680
--- /dev/null
+++ 
b/src/test/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifierTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link MD5ChecksumVerifier}
+ */
+class MD5ChecksumVerifierTest
+{
+    static Vertx vertx;
+    static ExposeAsyncFileMD5ChecksumVerifier verifier;
+
+    @TempDir
+    Path tempDir;
+
+    @BeforeAll
+    static void setup()
+    {
+        vertx = Vertx.vertx();
+        verifier = new ExposeAsyncFileMD5ChecksumVerifier(vertx.fileSystem());
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithValidChecksum() throws IOException, 
NoSuchAlgorithmException,
+                                                             
InterruptedException
+    {
+        byte[] randomBytes = generateRandomBytes();
+        Path randomFilePath = writeBytesToRandomFile(randomBytes);
+        String expectedChecksum = Base64.getEncoder()
+                                        
.encodeToString(MessageDigest.getInstance("MD5")
+                                                                     
.digest(randomBytes));
+
+        runTestScenario(randomFilePath, expectedChecksum);
+    }
+
+    @Test
+    void testFileDescriptorsClosedWithInvalidChecksum() throws IOException, 
InterruptedException
+    {
+        Path randomFilePath = writeBytesToRandomFile(generateRandomBytes());
+        runTestScenario(randomFilePath, "invalid");
+    }
+
+    private void runTestScenario(Path filePath, String checksum) throws 
InterruptedException
+    {
+        CountDownLatch latch = new CountDownLatch(1);
+        verifier.verify(checksum, filePath.toAbsolutePath().toString())
+                .onComplete(complete -> latch.countDown());
+
+        assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
+
+        assertThat(verifier.file).isNotNull();
+        // we can't close the file if it's already closed, so we expect the 
exception here
+        assertThatThrownBy(() -> verifier.file.end())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("File handle is closed");
+    }
+
+    private byte[] generateRandomBytes()
+    {
+        byte[] bytes = new byte[1024];
+        ThreadLocalRandom.current().nextBytes(bytes);
+        return bytes;
+    }
+
+    private Path writeBytesToRandomFile(byte[] bytes) throws IOException
+    {
+        Path tempPath = tempDir.resolve("random-file.txt");
+        try (RandomAccessFile writer = new RandomAccessFile(tempPath.toFile(), 
"rw"))
+        {
+            writer.write(bytes);
+        }
+        return tempPath;
+    }
+
+    /**
+     * Class that extends from {@link MD5ChecksumVerifier} for testing 
purposes and holds a reference to the
+     * {@link AsyncFile} to ensure that the file has been closed.
+     */
+    static class ExposeAsyncFileMD5ChecksumVerifier extends MD5ChecksumVerifier
+    {
+        AsyncFile file;
+
+        public ExposeAsyncFileMD5ChecksumVerifier(FileSystem fs)
+        {
+            super(fs);
+        }
+
+        @Override
+        Future<String> calculateMD5(AsyncFile file)
+        {
+            this.file = file;
+            return super.calculateMD5(file);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to