This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch CASSANDRA-19477-trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a79679ebf1bd3e07e1ddb2fd2f512f8cfb800387
Author: Stefan Miklosovic <smikloso...@apache.org>
AuthorDate: Sun Mar 17 02:24:12 2024 +0100

    CASSANDRA-19477
---
 .../apache/cassandra/hints/HintsDescriptor.java    |  8 ++++--
 .../org/apache/cassandra/hints/HintsStore.java     | 29 ++++++++++++++++++----
 .../apache/cassandra/hints/HintsWriteExecutor.java | 25 +++++++++++++++----
 .../org/apache/cassandra/service/StorageProxy.java | 14 ++++++-----
 .../apache/cassandra/hints/HintsCatalogTest.java   |  6 ++---
 5 files changed, 60 insertions(+), 22 deletions(-)

diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java 
b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 3079c0b80c..e28ebd53b0 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -81,6 +81,8 @@ final class HintsDescriptor
     final UUID hostId;
     final int version;
     final long timestamp;
+    final String hintsFileName;
+    final String crc32FileName;
 
     final ImmutableMap<String, Object> parameters;
     final ParameterizedClass compressionConfig;
@@ -93,6 +95,8 @@ final class HintsDescriptor
         this.hostId = hostId;
         this.version = version;
         this.timestamp = timestamp;
+        this.hintsFileName = hostId + "-" + timestamp + "-" + version + 
".hints";
+        this.crc32FileName = hostId + "-" + timestamp + "-" + version + 
".crc32";
         compressionConfig = createCompressionConfig(parameters);
 
         EncryptionData encryption = createEncryption(parameters);
@@ -205,12 +209,12 @@ final class HintsDescriptor
 
     String fileName()
     {
-        return String.format("%s-%s-%s.hints", hostId, timestamp, version);
+        return hintsFileName;
     }
 
     String checksumFileName()
     {
-        return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
+        return crc32FileName;
     }
 
     File file(File hintsDirectory)
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java 
b/src/java/org/apache/cassandra/hints/HintsStore.java
index b7850ee2b4..ed06474427 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -61,6 +61,7 @@ final class HintsStore
     private final Deque<HintsDescriptor> dispatchDequeue;
     private final Queue<HintsDescriptor> corruptedFiles;
     private final Map<HintsDescriptor, Long> hintsExpirations;
+    private final Map<String, Long> hintsSizes;
 
     // last timestamp used in a descriptor; make sure to not reuse the same 
timestamp for new descriptors.
     private volatile long lastUsedTimestamp;
@@ -76,9 +77,13 @@ final class HintsStore
         dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
         corruptedFiles = new ConcurrentLinkedQueue<>();
         hintsExpirations = new ConcurrentHashMap<>();
+        hintsSizes = new ConcurrentHashMap<>();
 
         //noinspection resource
         lastUsedTimestamp = descriptors.stream().mapToLong(d -> 
d.timestamp).max().orElse(0L);
+
+        for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, 
corruptedFiles))
+            hintsSizes.put(descriptor.hintsFileName, 
descriptor.file(hintsDirectory).length());
     }
 
     static HintsStore create(UUID hostId, File hintsDirectory, 
ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
@@ -87,6 +92,11 @@ final class HintsStore
         return new HintsStore(hostId, hintsDirectory, writerParams, 
descriptors);
     }
 
+    Map<String, Long> getHintsSizes()
+    {
+        return hintsSizes;
+    }
+
     @VisibleForTesting
     int getDispatchQueueSize()
     {
@@ -203,6 +213,9 @@ final class HintsStore
         {
             dispatchDequeue.removeAll(removeSet);
             corruptedFiles.removeAll(removeSet);
+            // if delete(descriptor) throws, hence
+            for (HintsDescriptor descriptor : removeSet)
+                hintsSizes.remove(descriptor.fileName());
         }
     }
 
@@ -210,7 +223,11 @@ final class HintsStore
     {
         File hintsFile = descriptor.file(hintsDirectory);
         if (hintsFile.tryDelete())
+        {
+            // this method is also called standalone (without deleteHints)
+            hintsSizes.remove(descriptor.fileName());
             logger.info("Deleted hint file {}", descriptor.fileName());
+        }
         else if (hintsFile.exists())
             logger.error("Failed to delete hint file {}", 
descriptor.fileName());
         else
@@ -235,17 +252,15 @@ final class HintsStore
         dispatchPositions.put(descriptor, inputPosition);
     }
 
-
     /**
      * @return the total size of all files belonging to the hints store, in 
bytes.
      */
     long getTotalFileSize()
     {
         long total = 0;
-        for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, 
corruptedFiles))
-        {
-            total += descriptor.file(hintsDirectory).length();
-        }
+        for (long value : hintsSizes.values())
+            total += value;
+
         return total;
     }
 
@@ -282,7 +297,11 @@ final class HintsStore
     HintsWriter getOrOpenWriter()
     {
         if (hintsWriter == null)
+        {
             hintsWriter = openWriter();
+            hintsSizes.put(hintsWriter.descriptor().fileName(), 
hintsWriter.descriptor().file(hintsDirectory).length());
+        }
+
         return hintsWriter;
     }
 
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java 
b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index 156fbfc018..3cd4bfb43a 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.hints;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -260,19 +261,33 @@ final class HintsWriteExecutor
         long maxHintsFileSize = DatabaseDescriptor.getMaxHintsFileSize();
 
         HintsWriter writer = store.getOrOpenWriter();
+        HintsDescriptor descriptor = writer.descriptor();
 
+        // If the creation of a new session throws, we have not written 
anything anyway,
+        // so we do not need to update hints sizes.
+        // If we throw during appending, we will track how many bytes were 
written in finally block
         try (HintsWriter.Session session = writer.newSession(writeBuffer))
         {
-            while (iterator.hasNext())
+            try
+            {
+                while (iterator.hasNext())
+                {
+                    session.append(iterator.next());
+                    if (session.position() >= maxHintsFileSize)
+                        break;
+                }
+            }
+            finally
             {
-                session.append(iterator.next());
-                if (session.position() >= maxHintsFileSize)
-                    break;
+                Map<String, Long> hintsSizes = store.getHintsSizes();
+                hintsSizes.compute(descriptor.hintsFileName, (d, currentSize) 
->
+                                                             currentSize == 
null ? session.getBytesWritten()
+                                                                               
  : currentSize + session.getBytesWritten());
             }
         }
         catch (IOException e)
         {
-            throw new FSWriteError(e, writer.descriptor().fileName());
+            throw new FSWriteError(e, descriptor.fileName());
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index e755117488..aa5625e605 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2406,13 +2406,15 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost();
-        long actualTotalHintsSize = 
HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
-        boolean hasHintsReachedMaxSize = maxHintsSize > 0 && 
actualTotalHintsSize > maxHintsSize;
-        if (hasHintsReachedMaxSize)
+        if (maxHintsSize > 0)
         {
-            Tracing.trace("Not hinting {} which has reached to the max hints 
size {} bytes on disk. The actual hints size on disk: {}",
-                          endpoint, maxHintsSize, actualTotalHintsSize);
-            return false;
+            long actualTotalHintsSize = 
HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
+            if (actualTotalHintsSize > maxHintsSize)
+            {
+                Tracing.trace("Not hinting {} which has reached to the max 
hints size {} bytes on disk. The actual hints size on disk: {}",
+                              endpoint, maxHintsSize, actualTotalHintsSize);
+                return false;
+            }
         }
 
         return true;
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java 
b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index af73d1b009..30f0aa8e52 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -150,16 +150,14 @@ public class HintsCatalogTest
     {
         File directory = new File(testFolder.newFolder());
         UUID hostId = UUID.randomUUID();
-        long now = Clock.Global.currentTimeMillis();
         long totalSize = 0;
         HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
         HintsStore store = catalog.get(hostId);
         assertEquals(totalSize, store.getTotalFileSize());
         for (int i = 0; i < 3; i++)
         {
-            HintsDescriptor descriptor = new HintsDescriptor(hostId, now + i);
-            writeDescriptor(directory, descriptor);
-            store.offerLast(descriptor);
+            store.getOrOpenWriter();
+            store.closeWriter();
             assertTrue("Total file size should increase after writing more 
hints", store.getTotalFileSize() > totalSize);
             totalSize = store.getTotalFileSize();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to