This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch CASSANDRA-19477-trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a79679ebf1bd3e07e1ddb2fd2f512f8cfb800387 Author: Stefan Miklosovic <smikloso...@apache.org> AuthorDate: Sun Mar 17 02:24:12 2024 +0100 CASSANDRA-19477 --- .../apache/cassandra/hints/HintsDescriptor.java | 8 ++++-- .../org/apache/cassandra/hints/HintsStore.java | 29 ++++++++++++++++++---- .../apache/cassandra/hints/HintsWriteExecutor.java | 25 +++++++++++++++---- .../org/apache/cassandra/service/StorageProxy.java | 14 ++++++----- .../apache/cassandra/hints/HintsCatalogTest.java | 6 ++--- 5 files changed, 60 insertions(+), 22 deletions(-) diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java index 3079c0b80c..e28ebd53b0 100644 --- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java +++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java @@ -81,6 +81,8 @@ final class HintsDescriptor final UUID hostId; final int version; final long timestamp; + final String hintsFileName; + final String crc32FileName; final ImmutableMap<String, Object> parameters; final ParameterizedClass compressionConfig; @@ -93,6 +95,8 @@ final class HintsDescriptor this.hostId = hostId; this.version = version; this.timestamp = timestamp; + this.hintsFileName = hostId + "-" + timestamp + "-" + version + ".hints"; + this.crc32FileName = hostId + "-" + timestamp + "-" + version + ".crc32"; compressionConfig = createCompressionConfig(parameters); EncryptionData encryption = createEncryption(parameters); @@ -205,12 +209,12 @@ final class HintsDescriptor String fileName() { - return String.format("%s-%s-%s.hints", hostId, timestamp, version); + return hintsFileName; } String checksumFileName() { - return String.format("%s-%s-%s.crc32", hostId, timestamp, version); + return crc32FileName; } File file(File hintsDirectory) diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java index b7850ee2b4..ed06474427 100644 --- a/src/java/org/apache/cassandra/hints/HintsStore.java +++ b/src/java/org/apache/cassandra/hints/HintsStore.java @@ -61,6 +61,7 @@ final class HintsStore private final Deque<HintsDescriptor> dispatchDequeue; private final Queue<HintsDescriptor> corruptedFiles; private final Map<HintsDescriptor, Long> hintsExpirations; + private final Map<String, Long> hintsSizes; // last timestamp used in a descriptor; make sure to not reuse the same timestamp for new descriptors. private volatile long lastUsedTimestamp; @@ -76,9 +77,13 @@ final class HintsStore dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors); corruptedFiles = new ConcurrentLinkedQueue<>(); hintsExpirations = new ConcurrentHashMap<>(); + hintsSizes = new ConcurrentHashMap<>(); //noinspection resource lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L); + + for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, corruptedFiles)) + hintsSizes.put(descriptor.hintsFileName, descriptor.file(hintsDirectory).length()); } static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors) @@ -87,6 +92,11 @@ final class HintsStore return new HintsStore(hostId, hintsDirectory, writerParams, descriptors); } + Map<String, Long> getHintsSizes() + { + return hintsSizes; + } + @VisibleForTesting int getDispatchQueueSize() { @@ -203,6 +213,9 @@ final class HintsStore { dispatchDequeue.removeAll(removeSet); corruptedFiles.removeAll(removeSet); + // if delete(descriptor) throws, hence + for (HintsDescriptor descriptor : removeSet) + hintsSizes.remove(descriptor.fileName()); } } @@ -210,7 +223,11 @@ final class HintsStore { File hintsFile = descriptor.file(hintsDirectory); if (hintsFile.tryDelete()) + { + // this method is also called standalone (without deleteHints) + hintsSizes.remove(descriptor.fileName()); logger.info("Deleted hint file {}", descriptor.fileName()); + } else if (hintsFile.exists()) logger.error("Failed to delete hint file {}", descriptor.fileName()); else @@ -235,17 +252,15 @@ final class HintsStore dispatchPositions.put(descriptor, inputPosition); } - /** * @return the total size of all files belonging to the hints store, in bytes. */ long getTotalFileSize() { long total = 0; - for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, corruptedFiles)) - { - total += descriptor.file(hintsDirectory).length(); - } + for (long value : hintsSizes.values()) + total += value; + return total; } @@ -282,7 +297,11 @@ final class HintsStore HintsWriter getOrOpenWriter() { if (hintsWriter == null) + { hintsWriter = openWriter(); + hintsSizes.put(hintsWriter.descriptor().fileName(), hintsWriter.descriptor().file(hintsDirectory).length()); + } + return hintsWriter; } diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java index 156fbfc018..3cd4bfb43a 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java @@ -20,6 +20,7 @@ package org.apache.cassandra.hints; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -260,19 +261,33 @@ final class HintsWriteExecutor long maxHintsFileSize = DatabaseDescriptor.getMaxHintsFileSize(); HintsWriter writer = store.getOrOpenWriter(); + HintsDescriptor descriptor = writer.descriptor(); + // If the creation of a new session throws, we have not written anything anyway, + // so we do not need to update hints sizes. + // If we throw during appending, we will track how many bytes were written in finally block try (HintsWriter.Session session = writer.newSession(writeBuffer)) { - while (iterator.hasNext()) + try + { + while (iterator.hasNext()) + { + session.append(iterator.next()); + if (session.position() >= maxHintsFileSize) + break; + } + } + finally { - session.append(iterator.next()); - if (session.position() >= maxHintsFileSize) - break; + Map<String, Long> hintsSizes = store.getHintsSizes(); + hintsSizes.compute(descriptor.hintsFileName, (d, currentSize) -> + currentSize == null ? session.getBytesWritten() + : currentSize + session.getBytesWritten()); } } catch (IOException e) { - throw new FSWriteError(e, writer.descriptor().fileName()); + throw new FSWriteError(e, descriptor.fileName()); } } } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e755117488..aa5625e605 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -2406,13 +2406,15 @@ public class StorageProxy implements StorageProxyMBean } long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost(); - long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint); - boolean hasHintsReachedMaxSize = maxHintsSize > 0 && actualTotalHintsSize > maxHintsSize; - if (hasHintsReachedMaxSize) + if (maxHintsSize > 0) { - Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}", - endpoint, maxHintsSize, actualTotalHintsSize); - return false; + long actualTotalHintsSize = HintsService.instance.getTotalHintsSize(hostIdForEndpoint); + if (actualTotalHintsSize > maxHintsSize) + { + Tracing.trace("Not hinting {} which has reached to the max hints size {} bytes on disk. The actual hints size on disk: {}", + endpoint, maxHintsSize, actualTotalHintsSize); + return false; + } } return true; diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java index af73d1b009..30f0aa8e52 100644 --- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java @@ -150,16 +150,14 @@ public class HintsCatalogTest { File directory = new File(testFolder.newFolder()); UUID hostId = UUID.randomUUID(); - long now = Clock.Global.currentTimeMillis(); long totalSize = 0; HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of()); HintsStore store = catalog.get(hostId); assertEquals(totalSize, store.getTotalFileSize()); for (int i = 0; i < 3; i++) { - HintsDescriptor descriptor = new HintsDescriptor(hostId, now + i); - writeDescriptor(directory, descriptor); - store.offerLast(descriptor); + store.getOrOpenWriter(); + store.closeWriter(); assertTrue("Total file size should increase after writing more hints", store.getTotalFileSize() > totalSize); totalSize = store.getTotalFileSize(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org