This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9460a0  Add non-blocking mode for CDC writes
d9460a0 is described below

commit d9460a04daee5fa97639abf2b6e28ff9b29cf636
Author: Yifan Cai <y...@apache.org>
AuthorDate: Fri Dec 3 12:18:31 2021 -0800

    Add non-blocking mode for CDC writes
    
    patch by Yifan Cai; reviewed by Josh McKenzie for CASSANDRA-17001
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  17 ++
 .../apache/cassandra/db/commitlog/CommitLog.java   |  24 ++
 .../db/commitlog/CommitLogDescriptor.java          |  18 +-
 .../cassandra/db/commitlog/CommitLogMBean.java     |   4 +
 .../cassandra/db/commitlog/CommitLogSegment.java   |   2 +-
 .../db/commitlog/CommitLogSegmentManagerCDC.java   | 121 +++++++---
 .../cassandra/utils/DirectorySizeCalculator.java   |  10 +-
 .../db/commitlog/CommitLogDescriptorTest.java      |  17 ++
 .../commitlog/CommitLogSegmentManagerCDCTest.java  | 262 ++++++++++++---------
 12 files changed, 327 insertions(+), 156 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 14a1623..91ff5a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add non-blocking mode for CDC writes (CASSANDRA-17001)
  * Add guardrails framework (CASSANDRA-17147)
  * Harden resource management on SSTable components to prevent future leaks 
(CASSANDRA-17174)
  * Make nodes more resilient to local unrelated files during startup 
(CASSANDRA-17082)
diff --git a/NEWS.txt b/NEWS.txt
index 9707cce..f365a68 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,10 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - CDC data flushing now can be configured to be non-blocking with the 
configuration cdc_block_writes. Setting to true,
+      any writes to the CDC-enabled tables will be blocked when reaching to 
the limit for CDC data on disk, which is the
+      existing and the default behavior. Setting to false, the writes to the 
CDC-enabled tables will be accepted and
+      the oldest CDC data on disk will be deleted to ensure the size 
constraint.
     - New native functions to convert unix time values into C* native types: 
toDate(bigint), toTimestamp(bigint),
       mintimeuuid(bigint) and maxtimeuuid(bigint)
     - Support for multiple permission in a single GRANT/REVOKE/LIST statement 
has been added. It allows to
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index cdf83b8..9fb57797 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,6 +281,9 @@ public class Config
 
     // Change-data-capture logs
     public boolean cdc_enabled = false;
+    // When true, new CDC mutations are rejected/blocked when reaching max CDC 
storage.
+    // When false, new CDC mutations can always be added. But it will remove 
the oldest CDC commit log segment on full.
+    public volatile boolean cdc_block_writes = true;
     public String cdc_raw_directory;
     public int cdc_total_space_in_mb = 0;
     public int cdc_free_space_check_interval_ms = 250;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8ed8612..d3abeb0 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2095,6 +2095,13 @@ public class DatabaseDescriptor
         return (int) 
ByteUnit.MEBI_BYTES.toBytes(conf.commitlog_segment_size_in_mb);
     }
 
+    /**
+     * Update commitlog_segment_size_in_mb in the tests.
+     * {@link CommitLogSegmentManagerCDC} uses the CommitLogSegmentSize to 
estimate the file size on allocation.
+     * It is important to keep the value unchanged for the estimation to be 
correct.
+     * @param sizeMegabytes
+     */
+    @VisibleForTesting /* Only for testing */
     public static void setCommitLogSegmentSize(int sizeMegabytes)
     {
         conf.commitlog_segment_size_in_mb = sizeMegabytes;
@@ -3157,6 +3164,16 @@ public class DatabaseDescriptor
         conf.cdc_enabled = cdc_enabled;
     }
 
+    public static boolean getCDCBlockWrites()
+    {
+        return conf.cdc_block_writes;
+    }
+
+    public static void setCDCBlockWrites(boolean val)
+    {
+        conf.cdc_block_writes = val;
+    }
+
     public static String getCDCLogLocation()
     {
         return conf.cdc_raw_directory;
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a4be769..c605234 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -28,6 +28,7 @@ import java.util.zip.CRC32;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.io.util.File;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -420,6 +421,29 @@ public class CommitLog implements CommitLogMBean
         return segmentRatios;
     }
 
+    @Override
+    public boolean getCDCBlockWrites()
+    {
+        return DatabaseDescriptor.getCDCBlockWrites();
+    }
+
+    @Override
+    public void setCDCBlockWrites(boolean val)
+    {
+        Preconditions.checkState(DatabaseDescriptor.isCDCEnabled(),
+                                 "Unable to set block_writes (%s): CDC is not 
enabled.", val);
+        Preconditions.checkState(segmentManager instanceof 
CommitLogSegmentManagerCDC,
+                                 "CDC is enabled but we have the wrong 
CommitLogSegmentManager type: %s. " +
+                                 "Please report this as bug.", 
segmentManager.getClass().getName());
+        boolean oldVal = DatabaseDescriptor.getCDCBlockWrites();
+        CommitLogSegment currentSegment = segmentManager.allocatingFrom();
+        // Update the current segment CDC state to PERMITTED if block_writes 
is disabled now, and it was in FORBIDDEN state
+        if (!val && currentSegment.getCDCState() == 
CommitLogSegment.CDCState.FORBIDDEN)
+            currentSegment.setCDCState(CommitLogSegment.CDCState.PERMITTED);
+        DatabaseDescriptor.setCDCBlockWrites(val);
+        logger.info("Updated CDC block_writes from {} to {}", oldVal, val);
+    }
+
     /**
      * Shuts down the threads used by the commit log, blocking until 
completion.
      * TODO this should accept a timeout, and throw TimeoutException
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 9e95658..82207ee 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -35,6 +35,7 @@ import java.util.zip.CRC32;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
@@ -51,6 +52,7 @@ public class CommitLogDescriptor
     private static final String SEPARATOR = "-";
     private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR;
     private static final String FILENAME_EXTENSION = ".log";
+    private static final String INDEX_FILENAME_SUFFIX = "_cdc.idx";
     // match both legacy and new version of commitlogs Ex: CommitLog-12345.log 
and CommitLog-4-12345.log.
     private static final Pattern COMMIT_LOG_FILE_PATTERN = 
Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + 
FILENAME_EXTENSION);
 
@@ -220,7 +222,21 @@ public class CommitLogDescriptor
 
     public String cdcIndexFileName()
     {
-        return FILENAME_PREFIX + version + SEPARATOR + id + "_cdc.idx";
+        return FILENAME_PREFIX + version + SEPARATOR + id + 
INDEX_FILENAME_SUFFIX;
+    }
+
+    /**
+     * Infer the corresponding cdc index file using its cdc commitlog file
+     * @param cdcCommitLogSegment
+     * @return cdc index file or null if the cdc index file cannot be inferred.
+     */
+    public static File inferCdcIndexFile(File cdcCommitLogSegment)
+    {
+        if (!isValid(cdcCommitLogSegment.name()))
+            return null;
+        String cdcFileName = cdcCommitLogSegment.name();
+        String indexFileName = cdcFileName.substring(0, cdcFileName.length() - 
FILENAME_EXTENSION.length()) + INDEX_FILENAME_SUFFIX;
+        return new File(DatabaseDescriptor.getCDCLogLocation(), indexFileName);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
index 3b20bbc..7e8deca 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
@@ -84,4 +84,8 @@ public interface CommitLogMBean
      * @return A map between active log segments and the compression ratio 
achieved for each.
      */
     public Map<String, Double> getActiveSegmentCompressionRatios();
+
+    public boolean getCDCBlockWrites();
+
+    public void setCDCBlockWrites(boolean val);
 }
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 45678f5..06218f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -69,7 +69,7 @@ public abstract class CommitLogSegment
         FORBIDDEN,
         CONTAINS
     }
-    Object cdcStateLock = new Object();
+    final Object cdcStateLock = new Object();
 
     private final static AtomicInteger nextId = new AtomicInteger(1);
     private static long replayLimitId;
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
index 6f6a1c2..4a2ddf2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -19,13 +19,16 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.IOException;
-import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.RateLimiter;
 import org.apache.cassandra.io.util.File;
 import org.slf4j.Logger;
@@ -67,20 +70,52 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         cdcSizeTracker.processDiscardedSegment(segment);
 
         if (delete)
-            FileUtils.deleteWithConfirm(segment.logFile);
+            segment.logFile.delete();
 
         if (segment.getCDCState() != CDCState.CONTAINS)
         {
             // Always delete hard-link from cdc folder if this segment didn't 
contain CDC data. Note: File may not exist
             // if processing discard during startup.
             File cdcLink = segment.getCDCFile();
-            if (cdcLink.exists())
-                FileUtils.deleteWithConfirm(cdcLink);
-
             File cdcIndexFile = segment.getCDCIndexFile();
-            if (cdcIndexFile.exists())
-                FileUtils.deleteWithConfirm(cdcIndexFile);
+            deleteCDCFiles(cdcLink, cdcIndexFile);
+        }
+    }
+
+    /**
+     * Delete the oldest hard-linked CDC commit log segment to free up space.
+     * @return total deleted file size in bytes
+     */
+    public long deleteOldestLinkedCDCCommitLogSegment()
+    {
+        File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
+        Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does 
not exist.");
+        File[] files = cdcDir.tryList(f -> 
CommitLogDescriptor.isValid(f.name()));
+        Preconditions.checkState(files != null && files.length > 0,
+                                 "There should be at least 1 CDC commit log 
segment.");
+        List<File> sorted = Arrays.stream(files)
+                                  
.sorted(Comparator.comparingLong(File::lastModified))
+                                  .collect(Collectors.toList());
+        File oldestCdcFile = sorted.get(0);
+        File cdcIndexFile = 
CommitLogDescriptor.inferCdcIndexFile(oldestCdcFile);
+        return deleteCDCFiles(oldestCdcFile, cdcIndexFile);
+    }
+
+    private long deleteCDCFiles(File cdcLink, File cdcIndexFile)
+    {
+        long total = 0;
+        if (cdcLink != null && cdcLink.exists())
+        {
+            total += cdcLink.length();
+            cdcLink.delete();
+        }
+
+        if (cdcIndexFile != null && cdcIndexFile.exists())
+        {
+            total += cdcIndexFile.length();
+            cdcIndexFile.delete();
         }
+        return total;
     }
 
     /**
@@ -170,7 +205,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         if (cdcFile.exists() && !cdcIndexFile.exists())
         {
             logger.trace("(Unopened) CDC segment {} is no longer needed and 
will be deleted now", cdcFile);
-            FileUtils.deleteWithConfirm(cdcFile);
+            cdcFile.delete();
         }
     }
 
@@ -193,15 +228,15 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
     {
         private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / 
DatabaseDescriptor.getCDCDiskCheckInterval());
         private ExecutorService cdcSizeCalculationExecutor;
-        private CommitLogSegmentManagerCDC segmentManager;
-
-        // Used instead of size during walk to remove chance of over-allocation
-        private volatile long sizeInProgress = 0;
+        private final CommitLogSegmentManagerCDC segmentManager;
+        // track the total size between two dictionary size calculations
+        private final AtomicLong sizeInProgress;
 
         CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
         {
             super(path);
             this.segmentManager = segmentManager;
+            this.sizeInProgress = new AtomicLong(0);
         }
 
         /**
@@ -209,7 +244,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
          */
         public void start()
         {
-            size = 0;
+            sizeInProgress.getAndSet(0);
             cdcSizeCalculationExecutor = 
executorFactory().configureSequential("CDCSizeCalculationExecutor")
                                                           
.withRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy())
                                                           .withQueueLimit(0)
@@ -221,7 +256,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
          * Synchronous size recalculation on each segment creation/deletion 
call could lead to very long delays in new
          * segment allocation, thus long delays in thread signaling to wake 
waiting allocation / writer threads.
          *
-         * This can be reached either from the segment management thread in 
ABstractCommitLogSegmentManager or from the
+         * This can be reached either from the segment management thread in 
AbstractCommitLogSegmentManager or from the
          * size recalculation executor, so we synchronize on this object to 
reduce the race overlap window available for
          * size to get off.
          *
@@ -232,11 +267,26 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
             // See synchronization in CommitLogSegment.setCDCState
             synchronized(segment.cdcStateLock)
             {
-                segment.setCDCState(defaultSegmentSize() + 
totalCDCSizeOnDisk() > allowableCDCBytes()
+                int segmentSize = defaultSegmentSize();
+                long allowance = allowableCDCBytes();
+                boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
+                segment.setCDCState(blocking && segmentSize + 
sizeInProgress.get() > allowance
                                     ? CDCState.FORBIDDEN
                                     : CDCState.PERMITTED);
+
+                // Remove the oldest cdc segment file when exceeding the CDC 
storage allowance
+                while (!blocking && segmentSize + sizeInProgress.get() > 
allowance)
+                {
+                    long releasedSize = 
segmentManager.deleteOldestLinkedCDCCommitLogSegment();
+                    sizeInProgress.getAndAdd(-releasedSize);
+                    logger.debug("Freed up {} bytes after deleting the oldest 
CDC commit log segment in non-blocking mode. " +
+                                 "Total on-disk CDC size: {}; allowed CDC 
size: {}",
+                                 releasedSize, sizeInProgress.get() + 
segmentSize, allowance);
+                }
+
+                // Aggresively count in the (estimated) size of new segments.
                 if (segment.getCDCState() == CDCState.PERMITTED)
-                    size += defaultSegmentSize();
+                    sizeInProgress.getAndAdd(segmentSize);
             }
 
             // Take this opportunity to kick off a recalc to pick up any 
consumer file deletion.
@@ -250,9 +300,13 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
             {
                 // Add to flushed size before decrementing unflushed so we 
don't have a window of false generosity
                 if (segment.getCDCState() == CDCState.CONTAINS)
-                    size += segment.onDiskSize();
+                    sizeInProgress.getAndAdd(segment.onDiskSize());
+
+                // Subtract the (estimated) size of the segment from 
processNewSegment.
+                // For the segement that CONTAINS, we update with adding the 
actual onDiskSize and removing the estimated size.
+                // For the segment that remains in PERMITTED, the file is to 
be deleted and the estimate should be returned.
                 if (segment.getCDCState() != CDCState.FORBIDDEN)
-                    size -= defaultSegmentSize();
+                    sizeInProgress.getAndAdd(-defaultSegmentSize());
             }
 
             // Take this opportunity to kick off a recalc to pick up any 
consumer file deletion.
@@ -268,7 +322,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         {
             try
             {
-                cdcSizeCalculationExecutor.submit(() -> 
recalculateOverflowSize());
+                
cdcSizeCalculationExecutor.submit(this::recalculateOverflowSize);
             }
             catch (RejectedExecutionException e)
             {
@@ -287,6 +341,8 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
 
         private int defaultSegmentSize()
         {
+            // CommitLogSegmentSize is only loaded from yaml.
+            // There is a setter but is used only for testing.
             return DatabaseDescriptor.getCommitLogSegmentSize();
         }
 
@@ -294,25 +350,17 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         {
             try
             {
+                resetSize();
                 // The Arrays.stream approach is considerably slower on 
Windows than linux
-                sizeInProgress = 0;
                 Files.walkFileTree(path.toPath(), this);
-                size = sizeInProgress;
+                sizeInProgress.getAndSet(getAllocatedSize());
             }
             catch (IOException ie)
             {
-                CommitLog.instance.handleCommitError("Failed CDC Size 
Calculation", ie);
+                CommitLog.handleCommitError("Failed CDC Size Calculation", ie);
             }
         }
 
-        @Override
-        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
throws IOException
-        {
-            sizeInProgress += attrs.size();
-            return FileVisitResult.CONTINUE;
-        }
-
-
         public void shutdown()
         {
             if (cdcSizeCalculationExecutor != null && 
!cdcSizeCalculationExecutor.isShutdown())
@@ -323,12 +371,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
 
         private void addSize(long toAdd)
         {
-            size += toAdd;
-        }
-
-        private long totalCDCSizeOnDisk()
-        {
-            return size;
+            sizeInProgress.getAndAdd(toAdd);
         }
     }
 
@@ -347,6 +390,6 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         }
         catch (InterruptedException e) {}
 
-        return cdcSizeTracker.totalCDCSizeOnDisk();
+        return cdcSizeTracker.getAllocatedSize();
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java 
b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
index 97fc22e..f0cfdea 100644
--- a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
+++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.io.util.File;
 
 public class DirectorySizeCalculator extends SimpleFileVisitor<Path>
 {
-    protected volatile long size = 0;
+    private volatile long size = 0;
     protected final File path;
 
     public DirectorySizeCalculator(File path)
@@ -63,4 +63,12 @@ public class DirectorySizeCalculator extends 
SimpleFileVisitor<Path>
     {
         return size;
     }
+
+    /**
+     * Reset the size to 0 in case that the size calculator is used multiple 
times
+     */
+    protected void resetSize()
+    {
+        size = 0;
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index 53c6769..87b5fb0 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@ -29,10 +29,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.config.TransparentDataEncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileSegmentInputStream;
 import org.apache.cassandra.net.MessagingService;
@@ -309,4 +311,19 @@ public class CommitLogDescriptorTest
         CommitLogDescriptor desc2 = new 
CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, 
enabledEncryption);
         Assert.assertEquals(desc1, desc2);
     }
+
+    @Test
+    public void testInferCDCIndexFile()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        String fileNameSuffix = "CommitLog-2-1340512736956320000";
+        File validCdcLink = new File(fileNameSuffix + ".log");
+        File inferredIndexFile = 
CommitLogDescriptor.inferCdcIndexFile(validCdcLink);
+        Assert.assertNotNull(inferredIndexFile);
+        Assert.assertEquals(fileNameSuffix + "_cdc.idx", 
inferredIndexFile.name());
+
+        File invalidCdcLink = new File(fileNameSuffix + ".invalidlog");
+        inferredIndexFile = 
CommitLogDescriptor.inferCdcIndexFile(invalidCdcLink);
+        Assert.assertNull(inferredIndexFile);
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index cbfdadb..a6e5ab1 100644
--- 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -65,31 +65,8 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
     @Test
     public void testCDCWriteFailure() throws Throwable
     {
-        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=true;");
-        CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
-        TableMetadata cfm = currentTableMetadata();
-
-        // Confirm that logic to check for whether or not we can allocate new 
CDC segments works
-        Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB();
-        try
-        {
-            DatabaseDescriptor.setCDCSpaceInMB(32);
-            // Spin until we hit CDC capacity and make sure we get a 
CDCWriteException
-            try
-            {
-                // Should trigger on anything < 20:1 compression ratio during 
compressed test
-                for (int i = 0; i < 100; i++)
-                {
-                    new RowUpdateBuilder(cfm, 0, i)
-                        .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
-                        .build().apply();
-                }
-                Assert.fail("Expected CDCWriteException from full CDC but did 
not receive it.");
-            }
-            catch (CDCWriteException e)
-            {
-                // expected, do nothing
-            }
+        testWithCDCSpaceInMb(32, () -> {
+            createTableAndBulkWrite();
             expectCurrentCDCState(CDCState.FORBIDDEN);
 
             // Confirm we can create a non-cdc table and write to it even 
while at cdc capacity
@@ -97,6 +74,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
             execute("INSERT INTO %s (idx, data) VALUES (1, '1');");
 
             // Confirm that, on flush+recyle, we see files show up in cdc_raw
+            CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
             
Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush();
             CommitLog.instance.forceRecycleAllSegments();
             cdcMgr.awaitManagementTasksCompletion();
@@ -109,57 +87,55 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
             // Update size tracker to reflect deleted files. Should flip flag 
on current allocatingFrom to allow.
             cdcMgr.updateCDCTotalSize();
             expectCurrentCDCState(CDCState.PERMITTED);
-        }
-        finally
-        {
-            DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize);
-        }
+        });
     }
 
     @Test
     public void testSegmentFlaggingOnCreation() throws Throwable
     {
-        CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
-        String ct = createTable("CREATE TABLE %s (idx int, data text, primary 
key(idx)) WITH cdc=true;");
-
-        int origSize = DatabaseDescriptor.getCDCSpaceInMB();
-        try
-        {
-            DatabaseDescriptor.setCDCSpaceInMB(16);
-            TableMetadata ccfm = 
Keyspace.open(keyspace()).getColumnFamilyStore(ct).metadata();
-            // Spin until we hit CDC capacity and make sure we get a 
CDCWriteException
-            try
-            {
-                for (int i = 0; i < 1000; i++)
-                {
-                    new RowUpdateBuilder(ccfm, 0, i)
-                        .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
-                        .build().apply();
-                }
-                Assert.fail("Expected CDCWriteException from full CDC but did 
not receive it.");
-            }
-            catch (CDCWriteException e) { }
-
-            expectCurrentCDCState(CDCState.FORBIDDEN);
-            CommitLog.instance.forceRecycleAllSegments();
+        testSegmentFlaggingOnCreation0();
+    }
 
-            cdcMgr.awaitManagementTasksCompletion();
-            // Delete all files in cdc_raw
-            for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList())
-                f.tryDelete();
-            cdcMgr.updateCDCTotalSize();
-            // Confirm cdc update process changes flag on active segment
-            expectCurrentCDCState(CDCState.PERMITTED);
+    @Test
+    public void testSegmentFlaggingWithNonblockingOnCreation() throws Throwable
+    {
+        testWithNonblockingMode(this::testSegmentFlaggingOnCreation0);
+    }
 
-            // Clear out archived CDC files
-            for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList()) {
-                FileUtils.deleteWithConfirm(f);
-            }
-        }
-        finally
-        {
-            DatabaseDescriptor.setCDCSpaceInMB(origSize);
+    @Test
+    public void testNonblockingShouldMaintainSteadyDiskUsage() throws Throwable
+    {
+        final int commitlogSize = DatabaseDescriptor.getCommitLogSegmentSize() 
/ 1024 / 1024;
+        final int cdcSizeLimit = commitlogSize + 1;
+        // Clear out all CDC files
+        for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList()) {
+            FileUtils.deleteWithConfirm(f);
         }
+        testWithNonblockingMode(() -> testWithCDCSpaceInMb(cdcSizeLimit, () -> 
{
+            CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
+            Assert.assertEquals(0, cdcMgr.updateCDCTotalSize());
+
+            createTableAndBulkWrite();
+
+            // Only the current commit log will be kept.
+            // The older ones are deleted immediately on creating a new 
segment due to exceeding size limit.
+            long actualSize = cdcMgr.updateCDCTotalSize();
+            Assert.assertTrue(actualSize <= cdcSizeLimit * 1024 * 1024);
+            Assert.assertTrue(actualSize >= 
DatabaseDescriptor.getCommitLogSegmentSize());
+        }));
+    }
+
+    @Test // switch from blocking to nonblocking, then back to blocking
+    public void testSwitchingCDCWriteModes() throws Throwable
+    {
+        String tableName = createTableAndBulkWrite();
+        expectCurrentCDCState(CDCState.FORBIDDEN);
+        testWithNonblockingMode(() -> {
+            bulkWrite(tableName);
+            expectCurrentCDCState(CDCState.CONTAINS);
+        });
+        bulkWrite(tableName);
+        expectCurrentCDCState(CDCState.FORBIDDEN);
     }
 
     @Test
@@ -187,30 +163,12 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
     }
 
     @Test
-    public void testCompletedFlag() throws IOException
+    public void testCompletedFlag() throws Throwable
     {
-        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=true;");
+        String tableName = createTable("CREATE TABLE %s (idx int, data text, 
primary key(idx)) WITH cdc=true;");
         CommitLogSegment initialSegment = 
CommitLog.instance.segmentManager.allocatingFrom();
-        Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB();
 
-        DatabaseDescriptor.setCDCSpaceInMB(8);
-        try
-        {
-            for (int i = 0; i < 1000; i++)
-            {
-                new RowUpdateBuilder(currentTableMetadata(), 0, 1)
-                .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
-                .build().apply();
-            }
-        }
-        catch (CDCWriteException ce)
-        {
-            // pass. Expected since we'll have a file or two linked on restart 
of CommitLog due to replay
-        }
-        finally
-        {
-            DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize);
-        }
+        testWithCDCSpaceInMb(8, () -> bulkWrite(tableName));
 
         CommitLog.instance.forceRecycleAllSegments();
 
@@ -280,32 +238,10 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
     }
 
     @Test
-    public void testReplayLogic() throws IOException
+    public void testReplayLogic() throws Throwable
     {
-        // Assert.assertEquals(0, new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
-        String table_name = createTable("CREATE TABLE %s (idx int, data text, 
primary key(idx)) WITH cdc=true;");
-        Integer originalCDCSize = DatabaseDescriptor.getCDCSpaceInMB();
-
-        DatabaseDescriptor.setCDCSpaceInMB(8);
-        TableMetadata ccfm = 
Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata();
-        try
-        {
-            for (int i = 0; i < 1000; i++)
-            {
-                new RowUpdateBuilder(ccfm, 0, i)
-                    .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
-                    .build().apply();
-            }
-            Assert.fail("Expected CDCWriteException from full CDC but did not 
receive it.");
-        }
-        catch (CDCWriteException e)
-        {
-            // pass
-        }
-        finally
-        {
-            DatabaseDescriptor.setCDCSpaceInMB(originalCDCSize);
-        }
+        // Assert.assertEquals(0, new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList().length);
+        testWithCDCSpaceInMb(8, this::createTableAndBulkWrite);
 
         CommitLog.instance.sync(true);
         CommitLog.instance.stopUnsafe(false);
@@ -449,4 +385,102 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
                         expectedState, currentState));
         }
     }
+
+    private void testWithNonblockingMode(Testable test) throws Throwable
+    {
+        boolean original = DatabaseDescriptor.getCDCBlockWrites();
+        CommitLog.instance.setCDCBlockWrites(false);
+        try
+        {
+            test.run();
+        }
+        catch (Throwable e)
+        {
+            e.printStackTrace();
+        }
+        finally
+        {
+            CommitLog.instance.setCDCBlockWrites(original);
+        }
+    }
+
+    private void testWithCDCSpaceInMb(int size, Testable test) throws Throwable
+    {
+        int origSize = DatabaseDescriptor.getCDCSpaceInMB();
+        DatabaseDescriptor.setCDCSpaceInMB(size);
+        try
+        {
+            test.run();
+        }
+        finally
+        {
+            DatabaseDescriptor.setCDCSpaceInMB(origSize);
+        }
+    }
+
+    private String createTableAndBulkWrite() throws Throwable
+    {
+        String tableName = createTable("CREATE TABLE %s (idx int, data text, 
primary key(idx)) WITH cdc=true;");
+        bulkWrite(tableName);
+        return tableName;
+    }
+
+    private void bulkWrite(String tableName) throws Throwable
+    {
+        TableMetadata ccfm = 
Keyspace.open(keyspace()).getColumnFamilyStore(tableName).metadata();
+        boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites();
+        // Spin to make sure we hit CDC capacity
+        try
+        {
+            for (int i = 0; i < 1000; i++)
+            {
+                new RowUpdateBuilder(ccfm, 0, i)
+                .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                .build().applyFuture().get();
+            }
+            if (blockWrites)
+                Assert.fail("Expected CDCWriteException from full CDC but did 
not receive it.");
+        }
+        catch (CDCWriteException e)
+        {
+            if (!blockWrites)
+                Assert.fail("Excepted no CDCWriteException when not blocking 
writes but received it.");
+        }
+    }
+
+    private void testSegmentFlaggingOnCreation0() throws Throwable
+    {
+        testWithCDCSpaceInMb(16, () -> {
+            boolean blockWrites = DatabaseDescriptor.getCDCBlockWrites();
+
+            createTableAndBulkWrite();
+
+            CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
+            expectCurrentCDCState(blockWrites? CDCState.FORBIDDEN : 
CDCState.CONTAINS);
+
+            // When block writes, releasing CDC commit logs should update the 
CDC state to PERMITTED
+            if (blockWrites)
+            {
+                CommitLog.instance.forceRecycleAllSegments();
+
+                cdcMgr.awaitManagementTasksCompletion();
+                // Delete all files in cdc_raw
+                for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList())
+                    f.delete();
+                cdcMgr.updateCDCTotalSize();
+                // Confirm cdc update process changes flag on active segment
+                expectCurrentCDCState(CDCState.PERMITTED);
+            }
+
+            // Clear out archived CDC files
+            for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).tryList()) {
+                FileUtils.deleteWithConfirm(f);
+            }
+        });
+    }
+
+    private interface Testable
+    {
+        void run() throws Throwable;
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to