This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new d9460a0 Add non-blocking mode for CDC writes d9460a0 is described below commit d9460a04daee5fa97639abf2b6e28ff9b29cf636 Author: Yifan Cai <y...@apache.org> AuthorDate: Fri Dec 3 12:18:31 2021 -0800 Add non-blocking mode for CDC writes patch by Yifan Cai; reviewed by Josh McKenzie for CASSANDRA-17001 --- CHANGES.txt | 1 + NEWS.txt | 4 + src/java/org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 17 ++ .../apache/cassandra/db/commitlog/CommitLog.java | 24 ++ .../db/commitlog/CommitLogDescriptor.java | 18 +- .../cassandra/db/commitlog/CommitLogMBean.java | 4 + .../cassandra/db/commitlog/CommitLogSegment.java | 2 +- .../db/commitlog/CommitLogSegmentManagerCDC.java | 121 +++++++--- .../cassandra/utils/DirectorySizeCalculator.java | 10 +- .../db/commitlog/CommitLogDescriptorTest.java | 17 ++ .../commitlog/CommitLogSegmentManagerCDCTest.java | 262 ++++++++++++--------- 12 files changed, 327 insertions(+), 156 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 14a1623..91ff5a3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Add non-blocking mode for CDC writes (CASSANDRA-17001) * Add guardrails framework (CASSANDRA-17147) * Harden resource management on SSTable components to prevent future leaks (CASSANDRA-17174) * Make nodes more resilient to local unrelated files during startup (CASSANDRA-17082) diff --git a/NEWS.txt b/NEWS.txt index 9707cce..f365a68 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -38,6 +38,10 @@ using the provided 'sstableupgrade' tool. New features ------------ + - CDC data flushing now can be configured to be non-blocking with the configuration cdc_block_writes. Setting to true, + any writes to the CDC-enabled tables will be blocked when reaching to the limit for CDC data on disk, which is the + existing and the default behavior. Setting to false, the writes to the CDC-enabled tables will be accepted and + the oldest CDC data on disk will be deleted to ensure the size constraint. - New native functions to convert unix time values into C* native types: toDate(bigint), toTimestamp(bigint), mintimeuuid(bigint) and maxtimeuuid(bigint) - Support for multiple permission in a single GRANT/REVOKE/LIST statement has been added. It allows to diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index cdf83b8..9fb57797 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -281,6 +281,9 @@ public class Config // Change-data-capture logs public boolean cdc_enabled = false; + // When true, new CDC mutations are rejected/blocked when reaching max CDC storage. + // When false, new CDC mutations can always be added. But it will remove the oldest CDC commit log segment on full. + public volatile boolean cdc_block_writes = true; public String cdc_raw_directory; public int cdc_total_space_in_mb = 0; public int cdc_free_space_check_interval_ms = 250; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 8ed8612..d3abeb0 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2095,6 +2095,13 @@ public class DatabaseDescriptor return (int) ByteUnit.MEBI_BYTES.toBytes(conf.commitlog_segment_size_in_mb); } + /** + * Update commitlog_segment_size_in_mb in the tests. + * {@link CommitLogSegmentManagerCDC} uses the CommitLogSegmentSize to estimate the file size on allocation. + * It is important to keep the value unchanged for the estimation to be correct. + * @param sizeMegabytes + */ + @VisibleForTesting /* Only for testing */ public static void setCommitLogSegmentSize(int sizeMegabytes) { conf.commitlog_segment_size_in_mb = sizeMegabytes; @@ -3157,6 +3164,16 @@ public class DatabaseDescriptor conf.cdc_enabled = cdc_enabled; } + public static boolean getCDCBlockWrites() + { + return conf.cdc_block_writes; + } + + public static void setCDCBlockWrites(boolean val) + { + conf.cdc_block_writes = val; + } + public static String getCDCLogLocation() { return conf.cdc_raw_directory; diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index a4be769..c605234 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -28,6 +28,7 @@ import java.util.zip.CRC32; import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.io.util.File; +import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -420,6 +421,29 @@ public class CommitLog implements CommitLogMBean return segmentRatios; } + @Override + public boolean getCDCBlockWrites() + { + return DatabaseDescriptor.getCDCBlockWrites(); + } + + @Override + public void setCDCBlockWrites(boolean val) + { + Preconditions.checkState(DatabaseDescriptor.isCDCEnabled(), + "Unable to set block_writes (%s): CDC is not enabled.", val); + Preconditions.checkState(segmentManager instanceof CommitLogSegmentManagerCDC, + "CDC is enabled but we have the wrong CommitLogSegmentManager type: %s. " + + "Please report this as bug.", segmentManager.getClass().getName()); + boolean oldVal = DatabaseDescriptor.getCDCBlockWrites(); + CommitLogSegment currentSegment = segmentManager.allocatingFrom(); + // Update the current segment CDC state to PERMITTED if block_writes is disabled now, and it was in FORBIDDEN state + if (!val && currentSegment.getCDCState() == CommitLogSegment.CDCState.FORBIDDEN) + currentSegment.setCDCState(CommitLogSegment.CDCState.PERMITTED); + DatabaseDescriptor.setCDCBlockWrites(val); + logger.info("Updated CDC block_writes from {} to {}", oldVal, val); + } + /** * Shuts down the threads used by the commit log, blocking until completion. * TODO this should accept a timeout, and throw TimeoutException diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index 9e95658..82207ee 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -35,6 +35,7 @@ import java.util.zip.CRC32; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSReadError; @@ -51,6 +52,7 @@ public class CommitLogDescriptor private static final String SEPARATOR = "-"; private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR; private static final String FILENAME_EXTENSION = ".log"; + private static final String INDEX_FILENAME_SUFFIX = "_cdc.idx"; // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log. private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION); @@ -220,7 +222,21 @@ public class CommitLogDescriptor public String cdcIndexFileName() { - return FILENAME_PREFIX + version + SEPARATOR + id + "_cdc.idx"; + return FILENAME_PREFIX + version + SEPARATOR + id + INDEX_FILENAME_SUFFIX; + } + + /** + * Infer the corresponding cdc index file using its cdc commitlog file + * @param cdcCommitLogSegment + * @return cdc index file or null if the cdc index file cannot be inferred. + */ + public static File inferCdcIndexFile(File cdcCommitLogSegment) + { + if (!isValid(cdcCommitLogSegment.name())) + return null; + String cdcFileName = cdcCommitLogSegment.name(); + String indexFileName = cdcFileName.substring(0, cdcFileName.length() - FILENAME_EXTENSION.length()) + INDEX_FILENAME_SUFFIX; + return new File(DatabaseDescriptor.getCDCLogLocation(), indexFileName); } /** diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java index 3b20bbc..7e8deca 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java @@ -84,4 +84,8 @@ public interface CommitLogMBean * @return A map between active log segments and the compression ratio achieved for each. */ public Map<String, Double> getActiveSegmentCompressionRatios(); + + public boolean getCDCBlockWrites(); + + public void setCDCBlockWrites(boolean val); } diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 45678f5..06218f8 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -69,7 +69,7 @@ public abstract class CommitLogSegment FORBIDDEN, CONTAINS } - Object cdcStateLock = new Object(); + final Object cdcStateLock = new Object(); private final static AtomicInteger nextId = new AtomicInteger(1); private static long replayLimitId; diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java index 6f6a1c2..4a2ddf2 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java @@ -19,13 +19,16 @@ package org.apache.cassandra.db.commitlog; import java.io.IOException; -import java.nio.file.FileVisitResult; import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.BasicFileAttributes; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.io.util.File; import org.slf4j.Logger; @@ -67,20 +70,52 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager cdcSizeTracker.processDiscardedSegment(segment); if (delete) - FileUtils.deleteWithConfirm(segment.logFile); + segment.logFile.delete(); if (segment.getCDCState() != CDCState.CONTAINS) { // Always delete hard-link from cdc folder if this segment didn't contain CDC data. Note: File may not exist // if processing discard during startup. File cdcLink = segment.getCDCFile(); - if (cdcLink.exists()) - FileUtils.deleteWithConfirm(cdcLink); - File cdcIndexFile = segment.getCDCIndexFile(); - if (cdcIndexFile.exists()) - FileUtils.deleteWithConfirm(cdcIndexFile); + deleteCDCFiles(cdcLink, cdcIndexFile); + } + } + + /** + * Delete the oldest hard-linked CDC commit log segment to free up space. + * @return total deleted file size in bytes + */ + public long deleteOldestLinkedCDCCommitLogSegment() + { + File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation()); + Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does not exist."); + File[] files = cdcDir.tryList(f -> CommitLogDescriptor.isValid(f.name())); + Preconditions.checkState(files != null && files.length > 0, + "There should be at least 1 CDC commit log segment."); + List<File> sorted = Arrays.stream(files) + .sorted(Comparator.comparingLong(File::lastModified)) + .collect(Collectors.toList()); + File oldestCdcFile = sorted.get(0); + File cdcIndexFile = CommitLogDescriptor.inferCdcIndexFile(oldestCdcFile); + return deleteCDCFiles(oldestCdcFile, cdcIndexFile); + } + + private long deleteCDCFiles(File cdcLink, File cdcIndexFile) + { + long total = 0; + if (cdcLink != null && cdcLink.exists()) + { + total += cdcLink.length(); + cdcLink.delete(); + } + + if (cdcIndexFile != null && cdcIndexFile.exists()) + { + total += cdcIndexFile.length(); + cdcIndexFile.delete(); } + return total; } /** @@ -170,7 +205,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager if (cdcFile.exists() && !cdcIndexFile.exists()) { logger.trace("(Unopened) CDC segment {} is no longer needed and will be deleted now", cdcFile); - FileUtils.deleteWithConfirm(cdcFile); + cdcFile.delete(); } } @@ -193,15 +228,15 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / DatabaseDescriptor.getCDCDiskCheckInterval()); private ExecutorService cdcSizeCalculationExecutor; - private CommitLogSegmentManagerCDC segmentManager; - - // Used instead of size during walk to remove chance of over-allocation - private volatile long sizeInProgress = 0; + private final CommitLogSegmentManagerCDC segmentManager; + // track the total size between two dictionary size calculations + private final AtomicLong sizeInProgress; CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path) { super(path); this.segmentManager = segmentManager; + this.sizeInProgress = new AtomicLong(0); } /** @@ -209,7 +244,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager */ public void start() { - size = 0; + sizeInProgress.getAndSet(0); cdcSizeCalculationExecutor = executorFactory().configureSequential("CDCSizeCalculationExecutor") .withRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()) .withQueueLimit(0) @@ -221,7 +256,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager * Synchronous size recalculation on each segment creation/deletion call could lead to very long delays in new * segment allocation, thus long delays in thread signaling to wake waiting allocation / writer threads. * - * This can be reached either from the segment management thread in ABstractCommitLogSegmentManager or from the + * This can be reached either from the segment management thread in AbstractCommitLogSegmentManager or from the * size recalculation executor, so we synchronize on this object to reduce the race overlap window available for * size to get off. * @@ -232,11 +267,26 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager // See synchronization in CommitLogSegment.setCDCState synchronized(segment.cdcStateLock) { - segment.setCDCState(defaultSegmentSize() + totalCDCSizeOnDisk() > allowableCDCBytes() + int segmentSize = defaultSegmentSize(); + long allowance = allowableCDCBytes(); + boolean blocking = DatabaseDescriptor.getCDCBlockWrites(); + segment.setCDCState(blocking && segmentSize + sizeInProgress.get() > allowance ? CDCState.FORBIDDEN : CDCState.PERMITTED); + + // Remove the oldest cdc segment file when exceeding the CDC storage allowance + while (!blocking && segmentSize + sizeInProgress.get() > allowance) + { + long releasedSize = segmentManager.deleteOldestLinkedCDCCommitLogSegment(); + sizeInProgress.getAndAdd(-releasedSize); + logger.debug("Freed up {} bytes after deleting the oldest CDC commit log segment in non-blocking mode. " + + "Total on-disk CDC size: {}; allowed CDC size: {}", + releasedSize, sizeInProgress.get() + segmentSize, allowance); + } + + // Aggresively count in the (estimated) size of new segments. if (segment.getCDCState() == CDCState.PERMITTED) - size += defaultSegmentSize(); + sizeInProgress.getAndAdd(segmentSize); } // Take this opportunity to kick off a recalc to pick up any consumer file deletion. @@ -250,9 +300,13 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { // Add to flushed size before decrementing unflushed so we don't have a window of false generosity if (segment.getCDCState() == CDCState.CONTAINS) - size += segment.onDiskSize(); + sizeInProgress.getAndAdd(segment.onDiskSize()); + + // Subtract the (estimated) size of the segment from processNewSegment. + // For the segement that CONTAINS, we update with adding the actual onDiskSize and removing the estimated size. + // For the segment that remains in PERMITTED, the file is to be deleted and the estimate should be returned. if (segment.getCDCState() != CDCState.FORBIDDEN) - size -= defaultSegmentSize(); + sizeInProgress.getAndAdd(-defaultSegmentSize()); } // Take this opportunity to kick off a recalc to pick up any consumer file deletion. @@ -268,7 +322,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { try { - cdcSizeCalculationExecutor.submit(() -> recalculateOverflowSize()); + cdcSizeCalculationExecutor.submit(this::recalculateOverflowSize); } catch (RejectedExecutionException e) { @@ -287,6 +341,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager private int defaultSegmentSize() { + // CommitLogSegmentSize is only loaded from yaml. + // There is a setter but is used only for testing. return DatabaseDescriptor.getCommitLogSegmentSize(); } @@ -294,25 +350,17 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { try { + resetSize(); // The Arrays.stream approach is considerably slower on Windows than linux - sizeInProgress = 0; Files.walkFileTree(path.toPath(), this); - size = sizeInProgress; + sizeInProgress.getAndSet(getAllocatedSize()); } catch (IOException ie) { - CommitLog.instance.handleCommitError("Failed CDC Size Calculation", ie); + CommitLog.handleCommitError("Failed CDC Size Calculation", ie); } } - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException - { - sizeInProgress += attrs.size(); - return FileVisitResult.CONTINUE; - } - - public void shutdown() { if (cdcSizeCalculationExecutor != null && !cdcSizeCalculationExecutor.isShutdown()) @@ -323,12 +371,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager private void addSize(long toAdd) { - size += toAdd; - } - - private long totalCDCSizeOnDisk() - { - return size; + sizeInProgress.getAndAdd(toAdd); } } @@ -347,6 +390,6 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager } catch (InterruptedException e) {} - return cdcSizeTracker.totalCDCSizeOnDisk(); + return cdcSizeTracker.getAllocatedSize(); } } diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java index 97fc22e..f0cfdea 100644 --- a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java +++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java @@ -31,7 +31,7 @@ import org.apache.cassandra.io.util.File; public class DirectorySizeCalculator extends SimpleFileVisitor<Path> { - protected volatile long size = 0; + private volatile long size = 0; protected final File path; public DirectorySizeCalculator(File path) @@ -63,4 +63,12 @@ public class DirectorySizeCalculator extends SimpleFileVisitor<Path> { return size; } + + /** + * Reset the size to 0 in case that the size calculator is used multiple times + */ + protected void resetSize() + { + size = 0; + } } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java index 53c6769..87b5fb0 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java @@ -29,10 +29,12 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.config.TransparentDataEncryptionOptions; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileSegmentInputStream; import org.apache.cassandra.net.MessagingService; @@ -309,4 +311,19 @@ public class CommitLogDescriptorTest CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption); Assert.assertEquals(desc1, desc2); } + + @Test + public void testInferCDCIndexFile() + { + DatabaseDescriptor.daemonInitialization(); + String fileNameSuffix = "CommitLog-2-1340512736956320000"; + File validCdcLink = new File(fileNameSuffix + ".log"); + File inferredIndexFile = CommitLogDescriptor.inferCdcIndexFile(validCdcLink); + Assert.assertNotNull(inferredIndexFile); + Assert.assertEquals(fileNameSuffix + "_cdc.idx", inferredIndexFile.name()); + + File invalidCdcLink = new File(fileNameSuffix + ".invalidlog"); + inferredIndexFile = CommitLogDescriptor.inferCdcIndexFile(invalidCdcLink); + Assert.assertNull(inferredIndexFile); + } } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java index cbfdadb..a6e5ab1 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java @@ -65,31 +65,8 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester @Test public void testCDCWriteFailure() throws Throwable { - createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); - CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; - TableMetadata cfm = currentTableMetadata(); - - // Confirm that logic to check for whether or not we can allocate new CDC segments works - Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB(); - try - { - DatabaseDescriptor.setCDCSpaceInMB(32); - // Spin until we hit CDC capacity and make sure we get a CDCWriteException - try - { - // Should trigger on anything < 20:1 compression ratio during compressed test - for (int i = 0; i < 100; i++) - { - new RowUpdateBuilder(cfm, 0, i) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) - .build().apply(); - } - Assert.fail("Expected CDCWriteException from full CDC but did not receive it."); - } - catch (CDCWriteException e) - { - // expected, do nothing - } + testWithCDCSpaceInMb(32, () -> { + createTableAndBulkWrite(); expectCurrentCDCState(CDCState.FORBIDDEN); // Confirm we can create a non-cdc table and write to it even while at cdc capacity @@ -97,6 +74,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester execute("INSERT INTO %s (idx, data) VALUES (1, '1');"); // Confirm that, on flush+recyle, we see files show up in cdc_raw + CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush(); CommitLog.instance.forceRecycleAllSegments(); cdcMgr.awaitManagementTasksCompletion(); @@ -109,57 +87,55 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester // Update size tracker to reflect deleted files. Should flip flag on current allocatingFrom to allow. cdcMgr.updateCDCTotalSize(); expectCurrentCDCState(CDCState.PERMITTED); - } - finally - { - DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize); - } + }); } @Test public void testSegmentFlaggingOnCreation() throws Throwable { - CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; - String ct = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); - - int origSize = DatabaseDescriptor.getCDCSpaceInMB(); - try - { - DatabaseDescriptor.setCDCSpaceInMB(16); - TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(ct).metadata(); - // Spin until we hit CDC capacity and make sure we get a CDCWriteException - try - { - for (int i = 0; i < 1000; i++) - { - new RowUpdateBuilder(ccfm, 0, i) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) - .build().apply(); - } - Assert.fail("Expected CDCWriteException from full CDC but did not receive it."); - } - catch (CDCWriteException e) { } - - expectCurrentCDCState(CDCState.FORBIDDEN); - CommitLog.instance.forceRecycleAllSegments(); + testSegmentFlaggingOnCreation0(); + } - cdcMgr.awaitManagementTasksCompletion(); - // Delete all files in cdc_raw - for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).tryList()) - f.tryDelete(); - cdcMgr.updateCDCTotalSize(); - // Confirm cdc update process changes flag on active segment - expectCurrentCDCState(CDCState.PERMITTED); + @Test + public void testSegmentFlaggingWithNonblockingOnCreation() throws Throwable + { + testWithNonblockingMode(this::testSegmentFlaggingOnCreation0); + } - // Clear out archived CDC files - for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).tryList()) { - FileUtils.deleteWithConfirm(f); - } - } - finally - { - DatabaseDescriptor.setCDCSpaceInMB(origSize); + @Test + public void testNonblockingShouldMaintainSteadyDiskUsage() throws Throwable + { + final int commitlogSize = DatabaseDescriptor.getCommitLogSegmentSize() / 1024 / 1024; + final int cdcSizeLimit = commitlogSize + 1; + // Clear out all CDC files + for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).tryList()) { + FileUtils.deleteWithConfirm(f); } + testWithNonblockingMode(() -> testWithCDCSpaceInMb(cdcSizeLimit, () -> { + CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; + Assert.assertEquals(0, cdcMgr.updateCDCTotalSize()); + + createTableAndBulkWrite(); + + // Only the current commit log will be kept. + // The older ones are deleted immediately on creating a new segment due to exceeding size limit. + long actualSize = cdcMgr.updateCDCTotalSize(); + Assert.assertTrue(actualSize <= cdcSizeLimit * 1024 * 1024); + Assert.assertTrue(actualSize >= DatabaseDescriptor.getCommitLogSegmentSize()); + })); + } + + @Test // switch from blocking to nonblocking, then back to blocking + public void testSwitchingCDCWriteModes() throws Throwable + { + String tableName = createTableAndBulkWrite(); + expectCurrentCDCState(CDCState.FORBIDDEN); + testWithNonblockingMode(() -> { + bulkWrite(tableName); + expectCurrentCDCState(CDCState.CONTAINS); + }); + bulkWrite(tableName); + expectCurrentCDCState(CDCState.FORBIDDEN); } @Test @@ -187,30 +163,12 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester } @Test - public void testCompletedFlag() throws IOException + public void testCompletedFlag() throws Throwable { - createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); + String tableName = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); CommitLogSegment initialSegment = CommitLog.instance.segmentManager.allocatingFrom(); - Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB(); - DatabaseDescriptor.setCDCSpaceInMB(8); - try - { - for (int i = 0; i < 1000; i++) - { - new RowUpdateBuilder(currentTableMetadata(), 0, 1) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) - .build().apply(); - } - } - catch (CDCWriteException ce) - { - // pass. Expected since we'll have a file or two linked on restart of CommitLog due to replay - } - finally - { - DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize); - } + testWithCDCSpaceInMb(8, () -> bulkWrite(tableName)); CommitLog.instance.forceRecycleAllSegments(); @@ -280,32 +238,10 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester } @Test - public void testReplayLogic() throws IOException + public void testReplayLogic() throws Throwable { - // Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length); - String table_name = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); - Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB(); - - DatabaseDescriptor.setCDCSpaceInMB(8); - TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata(); - try - { - for (int i = 0; i < 1000; i++) - { - new RowUpdateBuilder(ccfm, 0, i) - .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) - .build().apply(); - } - Assert.fail("Expected CDCWriteException from full CDC but did not receive it."); - } - catch (CDCWriteException e) - { - // pass - } - finally - { - DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize); - } + // Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).tryList().length); + testWithCDCSpaceInMb(8, this::createTableAndBulkWrite); CommitLog.instance.sync(true); CommitLog.instance.stopUnsafe(false); @@ -449,4 +385,102 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester expectedState, currentState)); } } + + private void testWithNonblockingMode(Testable test) throws Throwable + { + boolean original = DatabaseDescriptor.getCDCBlockWrites(); + CommitLog.instance.setCDCBlockWrites(false); + try + { + test.run(); + } + catch (Throwable e) + { + e.printStackTrace(); + } + finally + { + CommitLog.instance.setCDCBlockWrites(original); + } + } + + private void testWithCDCSpaceInMb(int size, Testable test) throws Throwable + { + int origSize = DatabaseDescriptor.getCDCSpaceInMB(); + DatabaseDescriptor.setCDCSpaceInMB(size); + try + { + test.run(); + } + finally + { + DatabaseDescriptor.setCDCSpaceInMB(origSize); + } + } + + private String createTableAndBulkWrite() throws Throwable + { + String tableName = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;"); + bulkWrite(tableName); + return tableName; + } + + private void bulkWrite(String tableName) throws Throwable + { + TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(tableName).metadata(); + boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites(); + // Spin to make sure we hit CDC capacity + try + { + for (int i = 0; i < 1000; i++) + { + new RowUpdateBuilder(ccfm, 0, i) + .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3)) + .build().applyFuture().get(); + } + if (blockWrites) + Assert.fail("Expected CDCWriteException from full CDC but did not receive it."); + } + catch (CDCWriteException e) + { + if (!blockWrites) + Assert.fail("Excepted no CDCWriteException when not blocking writes but received it."); + } + } + + private void testSegmentFlaggingOnCreation0() throws Throwable + { + testWithCDCSpaceInMb(16, () -> { + boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites(); + + createTableAndBulkWrite(); + + CommitLogSegmentManagerCDC cdcMgr = (CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager; + expectCurrentCDCState(blockWrites? CDCState.FORBIDDEN : CDCState.CONTAINS); + + // When block writes, releasing CDC commit logs should update the CDC state to PERMITTED + if (blockWrites) + { + CommitLog.instance.forceRecycleAllSegments(); + + cdcMgr.awaitManagementTasksCompletion(); + // Delete all files in cdc_raw + for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).tryList()) + f.delete(); + cdcMgr.updateCDCTotalSize(); + // Confirm cdc update process changes flag on active segment + expectCurrentCDCState(CDCState.PERMITTED); + } + + // Clear out archived CDC files + for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).tryList()) { + FileUtils.deleteWithConfirm(f); + } + }); + } + + private interface Testable + { + void run() throws Throwable; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org