Repository: cassandra
Updated Branches:
  refs/heads/trunk 4838e81a6 -> e9da85723


Make CDC availability more deterministic via hard-linking

Patch by jmckenzie; reviewed by blambov for CASSANDRA-12148


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e9da8572
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e9da8572
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e9da8572

Branch: refs/heads/trunk
Commit: e9da85723a8dd40872c4bca087a03b655bd2cacb
Parents: 4838e81
Author: Josh McKenzie <jmcken...@apache.org>
Authored: Tue Jul 12 12:53:55 2016 -0400
Committer: Josh McKenzie <jmcken...@apache.org>
Committed: Thu Aug 24 13:24:00 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   6 +-
 doc/source/operating/cdc.rst                    |  31 +-
 .../cassandra/config/DatabaseDescriptor.java    |   4 +
 .../AbstractCommitLogSegmentManager.java        |  20 +-
 .../cassandra/db/commitlog/CommitLog.java       |   8 +-
 .../db/commitlog/CommitLogDescriptor.java       |   5 +
 .../db/commitlog/CommitLogPosition.java         |   1 +
 .../db/commitlog/CommitLogReadHandler.java      |   2 +-
 .../cassandra/db/commitlog/CommitLogReader.java |  10 +-
 .../db/commitlog/CommitLogReplayer.java         |  57 ++-
 .../db/commitlog/CommitLogSegment.java          |  60 +++-
 .../commitlog/CommitLogSegmentManagerCDC.java   |  77 ++--
 .../CommitLogSegmentManagerStandard.java        |  14 +-
 .../cassandra/exceptions/CDCWriteException.java |  26 ++
 .../cassandra/exceptions/ExceptionCode.java     |  19 +-
 .../transport/messages/ErrorMessage.java        |   5 +
 test/conf/cassandra.yaml                        |   2 +
 test/conf/cdc.yaml                              |   3 +
 .../cassandra/db/commitlog/CDCTestReplayer.java |  76 ++++
 .../CommitLogSegmentManagerCDCTest.java         | 351 +++++++++++++++----
 21 files changed, 617 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f6fa48..cdcfe25 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,7 @@
  * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
  * Default for start_native_transport now true if not set in config 
(CASSANDRA-13656)
  * Don't add localhost to the graph when calculating where to stream from 
(CASSANDRA-13583)
+ * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
  * Allow skipping equality-restricted clustering columns in ORDER BY clause 
(CASSANDRA-10271)
  * Use common nowInSec for validation compactions (CASSANDRA-13671)
  * Improve handling of IR prepare failures (CASSANDRA-13672)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 253d773..2038342 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,7 +25,11 @@ New features
    - Support for arithmetic operations on number has been added. See 
CASSANDRA-11935
    - Preview expected streaming required for a repair (nodetool repair 
--preview), and validate the
      consistency of repaired data between nodes (nodetool repair --validate). 
See CASSANDRA-13257
-   - Support for selecting Map values and Set elements has been added for 
SELECT queries. See CASSANDRA-7396 
+   - Support for selecting Map values and Set elements has been added for 
SELECT queries. See CASSANDRA-7396
+   - Change-Data-Capture has been modified to make CommitLogSegments available
+     immediately upon creation via hard-linking the files. This means that 
incomplete
+     segments will be available in cdc_raw rather than fully flushed. See 
documentation
+     and CASSANDRA-12148 for more detail.
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/doc/source/operating/cdc.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/cdc.rst b/doc/source/operating/cdc.rst
index 192f62a..a7177b5 100644
--- a/doc/source/operating/cdc.rst
+++ b/doc/source/operating/cdc.rst
@@ -23,18 +23,26 @@ Overview
 ^^^^^^^^
 
 Change data capture (CDC) provides a mechanism to flag specific tables for 
archival as well as rejecting writes to those
-tables once a configurable size-on-disk for the combined flushed and unflushed 
CDC-log is reached. An operator can
-enable CDC on a table by setting the table property ``cdc=true`` (either when 
:ref:`creating the table
-<create-table-statement>` or :ref:`altering it <alter-table-statement>`), 
after which any CommitLogSegments containing
-data for a CDC-enabled table are moved to the directory specified in 
``cassandra.yaml`` on segment discard. A threshold
-of total disk space allowed is specified in the yaml at which time newly 
allocated CommitLogSegments will not allow CDC
-data until a consumer parses and removes data from the destination archival 
directory.
+tables once a configurable size-on-disk for the CDC log is reached. An 
operator can enable CDC on a table by setting the
+table property ``cdc=true`` (either when :ref:`creating the table 
<create-table-statement>` or
+:ref:`altering it <alter-table-statement>`). Upon CommitLogSegment creation, a 
hard-link to the segment is created in the
+directory specified in ``cassandra.yaml``. On segment fsync to disk, if CDC 
data is present anywhere in the segment a
+<segment_name>_cdc.idx file is also created with the integer offset of how 
much data in the original segment is persisted
+to disk. Upon final segment flush, a second line with the human-readable word 
"COMPLETED" will be added to the _cdc.idx
+file indicating that Cassandra has completed all processing on the file.
+
+We we use an index file rather than just encouraging clients to parse the log 
realtime off a memory mapped handle as data
+can be reflected in a kernel buffer that is not yet persisted to disk. Parsing 
only up to the listed offset in the _cdc.idx
+file will ensure that you only parse CDC data for data that is durable.
+
+A threshold of total disk space allowed is specified in the yaml at which time 
newly allocated CommitLogSegments will
+not allow CDC data until a consumer parses and removes files from the 
specified cdc_raw directory.
 
 Configuration
 ^^^^^^^^^^^^^
 
-Enabling or disable CDC on a table
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Enabling or disabling CDC on a table
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 CDC is enable or disable through the `cdc` table property, for instance::
 
@@ -64,7 +72,7 @@ The following `cassandra.yaml` are available for CDC:
 
 Reading CommitLogSegments
 ^^^^^^^^^^^^^^^^^^^^^^^^^
-This implementation included a refactor of CommitLogReplayer into 
`CommitLogReader.java
+Use a `CommitLogReader.java
 
<https://github.com/apache/cassandra/blob/e31e216234c6b57a531cae607e0355666007deb2/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java>`__.
 Usage is `fairly straightforward
 
<https://github.com/apache/cassandra/blob/e31e216234c6b57a531cae607e0355666007deb2/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java#L132-L140>`__
@@ -78,12 +86,11 @@ Warnings
 
 **Do not enable CDC without some kind of consumption process in-place.**
 
-The initial implementation of Change Data Capture does not include a parser 
(see :ref:`reading-commitlogsegments` above)
-so, if CDC is enabled on a node and then on a table, the 
``cdc_free_space_in_mb`` will fill up and then writes to
+If CDC is enabled on a node and then on a table, the ``cdc_free_space_in_mb`` 
will fill up and then writes to
 CDC-enabled tables will be rejected unless some consumption process is in 
place.
 
 Further Reading
 ^^^^^^^^^^^^^^^
 
-- `Design doc 
<https://docs.google.com/document/d/1ZxCWYkeZTquxsvf5hdPc0fiUnUHna8POvgt6TIzML4Y/edit>`__
 - `JIRA ticket <https://issues.apache.org/jira/browse/CASSANDRA-8844>`__
+- `JIRA ticket <https://issues.apache.org/jira/browse/CASSANDRA-12148>`__

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a839224..f06b8e1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -441,6 +441,10 @@ public class DatabaseDescriptor
             conf.cdc_raw_directory = storagedirFor("cdc_raw");
         }
 
+        // Windows memory-mapped CommitLog files is incompatible with CDC as 
we hard-link files in cdc_raw. Confirm we don't have both enabled.
+        if (FBUtilities.isWindows && conf.cdc_enabled && 
conf.commitlog_compression == null)
+            throw new ConfigurationException("Cannot enable cdc on Windows 
with uncompressed commitlog.");
+
         if (conf.commitlog_total_space_in_mb == null)
         {
             int preferredSize = 8192;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 808ef37..42fffc4 100755
--- 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.schema.TableId;
@@ -180,19 +181,12 @@ public abstract class AbstractCommitLogSegmentManager
         }
     }
 
-
     /**
      * Allocate a segment within this CLSM. Should either succeed or throw.
      */
     public abstract Allocation allocate(Mutation mutation, int size);
 
     /**
-     * The recovery and replay process replays mutations into memtables and 
flushes them to disk. Individual CLSM
-     * decide what to do with those segments on disk after they've been 
replayed.
-     */
-    abstract void handleReplayedSegment(final File file);
-
-    /**
      * Hook to allow segment managers to track state surrounding creation of 
new segments. Onl perform as task submit
      * to segment manager so it's performed on segment management thread.
      */
@@ -332,6 +326,18 @@ public abstract class AbstractCommitLogSegmentManager
     }
 
     /**
+     * Delete untracked segment files after replay
+     *
+     * @param file segment file that is no longer in use.
+     */
+    void handleReplayedSegment(final File file)
+    {
+        // (don't decrease managed size, since this was never a "live" segment)
+        logger.trace("(Unopened) segment {} is no longer needed and will be 
deleted now", file);
+        FileUtils.deleteWithConfirm(file);
+    }
+
+    /**
      * Adjust the tracked on-disk size. Called by individual segments to 
reflect writes, allocations and discards.
      * @param addedSize
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index e93a131..a4978b1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.exceptions.CDCWriteException;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
@@ -239,9 +239,9 @@ public class CommitLog implements CommitLogMBean
      * Add a Mutation to the commit log. If CDC is enabled, this can fail.
      *
      * @param mutation the Mutation to add to the log
-     * @throws WriteTimeoutException
+     * @throws CDCWriteException
      */
-    public CommitLogPosition add(Mutation mutation) throws 
WriteTimeoutException
+    public CommitLogPosition add(Mutation mutation) throws CDCWriteException
     {
         assert mutation != null;
 
@@ -431,6 +431,7 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
+     * FOR TESTING PURPOSES
      */
     public void stopUnsafe(boolean deleteSegments)
     {
@@ -448,7 +449,6 @@ public class CommitLog implements CommitLogMBean
         if (DatabaseDescriptor.isCDCEnabled() && deleteSegments)
             for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
                 FileUtils.deleteWithConfirm(f);
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index dd9414a..700f12a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -219,6 +219,11 @@ public class CommitLogDescriptor
         return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION;
     }
 
+    public String cdcIndexFileName()
+    {
+        return FILENAME_PREFIX + version + SEPARATOR + id + "_cdc.idx";
+    }
+
     /**
      * @param   filename  the filename to check
      * @return true if filename could be a commit log based on it's filename

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
index 84054a4..3ffb04c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogPosition.java
@@ -40,6 +40,7 @@ public class CommitLogPosition implements 
Comparable<CommitLogPosition>
     public static final CommitLogPosition NONE = new CommitLogPosition(-1, 0);
 
     public final long segmentId;
+    // Indicates the end position of the mutation in the CommitLog
     public final int position;
 
     public static final Comparator<CommitLogPosition> comparator = new 
Comparator<CommitLogPosition>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
index 0602147..ee05235 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReadHandler.java
@@ -69,7 +69,7 @@ public interface CommitLogReadHandler
      *
      * @param m deserialized mutation
      * @param size serialized size of the mutation
-     * @param entryLocation filePointer offset inside the CommitLogSegment for 
the record
+     * @param entryLocation filePointer offset inside the CommitLogSegment for 
the end of the record
      * @param desc CommitLogDescriptor for mutation being processed
      */
     void handleMutation(Mutation m, int size, int entryLocation, 
CommitLogDescriptor desc);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 5fb1d5d..301c832 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -96,6 +96,14 @@ public class CommitLogReader
     }
 
     /**
+     * Reads all mutations from passed in file from minPosition
+     */
+    public void readCommitLogSegment(CommitLogReadHandler handler, File file, 
CommitLogPosition minPosition, boolean tolerateTruncation) throws IOException
+    {
+        readCommitLogSegment(handler, file, minPosition, ALL_MUTATIONS, 
tolerateTruncation);
+    }
+
+    /**
      * Reads passed in file fully, up to mutationLimit count
      */
     @VisibleForTesting
@@ -357,7 +365,7 @@ public class CommitLogReader
      * @param inputBuffer raw byte array w/Mutation data
      * @param size deserialized size of mutation
      * @param minPosition We need to suppress replay of mutations that are 
before the required minPosition
-     * @param entryLocation filePointer offset of mutation within 
CommitLogSegment
+     * @param entryLocation filePointer offset of end of mutation within 
CommitLogSegment
      * @param desc CommitLogDescriptor being worked on
      */
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 961107c..d1e63e6 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -34,17 +34,20 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -72,6 +75,9 @@ public class CommitLogReplayer implements CommitLogReadHandler
     private final CommitLogArchiver archiver;
 
     @VisibleForTesting
+    protected boolean sawCDCMutation;
+
+    @VisibleForTesting
     protected CommitLogReader commitLogReader;
 
     CommitLogReplayer(CommitLog commitLog,
@@ -130,14 +136,52 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
 
     public void replayPath(File file, boolean tolerateTruncation) throws 
IOException
     {
+        sawCDCMutation = false;
         commitLogReader.readCommitLogSegment(this, file, globalPosition, 
CommitLogReader.ALL_MUTATIONS, tolerateTruncation);
+        if (sawCDCMutation)
+            handleCDCReplayCompletion(file);
     }
 
     public void replayFiles(File[] clogs) throws IOException
     {
-        commitLogReader.readAllFiles(this, clogs, globalPosition);
+        for (int i = 0; i < clogs.length; i++)
+        {
+            sawCDCMutation = false;
+            commitLogReader.readCommitLogSegment(this, clogs[i], 
globalPosition, i == clogs.length - 1);
+            if (sawCDCMutation)
+                handleCDCReplayCompletion(clogs[i]);
+        }
     }
 
+
+    /**
+     * Upon replay completion, CDC needs to hard-link files in the CDC folder 
and calculate index files so consumers can
+     * begin their work.
+     */
+    private void handleCDCReplayCompletion(File f) throws IOException
+    {
+        // Can only reach this point if CDC is enabled, thus we have a 
CDCSegmentManager
+        
((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).addCDCSize(f.length());
+
+        File dest = new File(DatabaseDescriptor.getCDCLogLocation(), 
f.getName());
+
+        // If hard link already exists, assume it's from a previous node run. 
If people are mucking around in the cdc_raw
+        // directory that's on them.
+        if (!dest.exists())
+            FileUtils.createHardLink(f, dest);
+
+        // The reader has already verified we can deserialize the descriptor.
+        CommitLogDescriptor desc;
+        try(RandomAccessReader reader = RandomAccessReader.open(f))
+        {
+            desc = CommitLogDescriptor.readHeader(reader, 
DatabaseDescriptor.getEncryptionContext());
+            assert desc != null;
+            assert f.length() < Integer.MAX_VALUE;
+            CommitLogSegment.writeCDCIndexFile(desc, (int)f.length(), true);
+        }
+    }
+
+
     /**
      * Flushes all keyspaces associated with this replayer in parallel, 
blocking until their flushes are complete.
      * @return the number of mutations replayed
@@ -366,6 +410,9 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
 
     public void handleMutation(Mutation m, int size, int entryLocation, 
CommitLogDescriptor desc)
     {
+        if (DatabaseDescriptor.isCDCEnabled() && m.trackedByCDC())
+            sawCDCMutation = true;
+
         pendingMutationBytes += size;
         futures.offer(mutationInitiator.initiateMutation(m,
                                                          desc.id,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5edf72b..8dd84e0 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -28,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import com.codahale.metrics.Timer;
@@ -92,7 +94,8 @@ public abstract class CommitLogSegment
     // Everything before this offset has been synced and written.  The 
SYNC_MARKER_SIZE bytes after
     // each sync are reserved, and point forwards to the next such offset.  
The final
     // sync marker in a segment will be zeroed out, or point to a position too 
close to the EOF to fit a marker.
-    private volatile int lastSyncedOffset;
+    @VisibleForTesting
+    volatile int lastSyncedOffset;
 
     // The end position of the buffer. Initially set to its capacity and 
updated to point to the last written position
     // as the segment is being closed.
@@ -212,7 +215,10 @@ public abstract class CommitLogSegment
                 opGroup.close();
                 return null;
             }
-            markDirty(mutation, position);
+
+            for (PartitionUpdate update : mutation.getPartitionUpdates())
+                coverInMap(tableDirty, update.metadata().id, position);
+
             return new Allocation(this, opGroup, position, (ByteBuffer) 
buffer.duplicate().position(position).limit(position + size));
         }
         catch (Throwable t)
@@ -333,6 +339,9 @@ public abstract class CommitLogSegment
         // Possibly perform compression or encryption, writing to file and 
flush.
         write(startMarker, sectionEnd);
 
+        if (cdcState == CDCState.CONTAINS)
+            writeCDCIndexFile(descriptor, sectionEnd, close);
+
         // Signal the sync as complete.
         lastSyncedOffset = nextMarker;
         if (close)
@@ -341,6 +350,27 @@ public abstract class CommitLogSegment
     }
 
     /**
+     * We persist the offset of the last data synced to disk so clients can 
parse only durable data if they choose. Data
+     * in shared / memory-mapped buffers reflects un-synced data so we need an 
external sentinel for clients to read to
+     * determine actual durable data persisted.
+     */
+    public static void writeCDCIndexFile(CommitLogDescriptor desc, int offset, 
boolean complete)
+    {
+        try(FileWriter writer = new FileWriter(new 
File(DatabaseDescriptor.getCDCLogLocation(), desc.cdcIndexFileName())))
+        {
+            writer.write(String.valueOf(offset));
+            if (complete)
+                writer.write("\nCOMPLETED");
+            writer.flush();
+        }
+        catch (IOException e)
+        {
+            if (!CommitLog.instance.handleCommitError("Failed to sync CDC 
Index: " + desc.cdcIndexFileName(), e))
+                throw new RuntimeException(e);
+        }
+    }
+
+    /**
      * Create a sync marker to delineate sections of the commit log, typically 
created on each sync of the file.
      * The sync marker consists of a file pointer to where the next sync 
marker should be (effectively declaring the length
      * of this section), as well as a CRC value.
@@ -405,6 +435,22 @@ public abstract class CommitLogSegment
         return logFile.getName();
     }
 
+    /**
+     * @return a File object representing the CDC directory and this file name 
for hard-linking
+     */
+    public File getCDCFile()
+    {
+        return new File(DatabaseDescriptor.getCDCLogLocation(), 
logFile.getName());
+    }
+
+    /**
+     * @return a File object representing the CDC Index file holding the 
offset and completion status of this segment
+     */
+    public File getCDCIndexFile()
+    {
+        return new File(DatabaseDescriptor.getCDCLogLocation(), 
descriptor.cdcIndexFileName());
+    }
+
     void waitForFinalSync()
     {
         while (true)
@@ -475,12 +521,6 @@ public abstract class CommitLogSegment
         i.expandToCover(value);
     }
 
-    void markDirty(Mutation mutation, int allocatedPosition)
-    {
-        for (PartitionUpdate update : mutation.getPartitionUpdates())
-            coverInMap(tableDirty, update.metadata().id, allocatedPosition);
-    }
-
     /**
      * Marks the ColumnFamily specified by id as clean for this log segment. 
If the
      * given context argument is contained in this file, it will only mark the 
CF as
@@ -623,6 +663,7 @@ public abstract class CommitLogSegment
         // Also synchronized in CDCSizeTracker.processNewSegment and 
.processDiscardedSegment
         synchronized(cdcStateLock)
         {
+            // Need duplicate CONTAINS to be idempotent since 2 threads can 
race on this lock
             if (cdcState == CDCState.CONTAINS && newState != CDCState.CONTAINS)
                 throw new IllegalArgumentException("Cannot transition from 
CONTAINS to any other state.");
 
@@ -673,6 +714,9 @@ public abstract class CommitLogSegment
             segment.waitForSync(position, waitingOnCommit);
         }
 
+        /**
+         * Returns the position in the CommitLogSegment at the end of this 
allocation.
+         */
         public CommitLogPosition getCommitLogPosition()
         {
             return new CommitLogPosition(segment.id, buffer.limit());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
index a91384f..4d31aad 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -32,9 +32,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.exceptions.CDCWriteException;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.DirectorySizeCalculator;
 import org.apache.cassandra.utils.NoSpamLogger;
@@ -64,12 +64,20 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
 
         cdcSizeTracker.processDiscardedSegment(segment);
 
-        if (segment.getCDCState() == CDCState.CONTAINS)
-            FileUtils.renameWithConfirm(segment.logFile.getAbsolutePath(), 
DatabaseDescriptor.getCDCLogLocation() + File.separator + 
segment.logFile.getName());
-        else
+        if (delete)
+            FileUtils.deleteWithConfirm(segment.logFile);
+
+        if (segment.getCDCState() != CDCState.CONTAINS)
         {
-            if (delete)
-                FileUtils.deleteWithConfirm(segment.logFile);
+            // Always delete hard-link from cdc folder if this segment didn't 
contain CDC data. Note: File may not exist
+            // if processing discard during startup.
+            File cdcLink = segment.getCDCFile();
+            if (cdcLink.exists())
+                FileUtils.deleteWithConfirm(cdcLink);
+
+            File cdcIndexFile = segment.getCDCIndexFile();
+            if (cdcIndexFile.exists())
+                FileUtils.deleteWithConfirm(cdcIndexFile);
         }
     }
 
@@ -89,10 +97,10 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
      * @param mutation Mutation to allocate in segment manager
      * @param size total size (overhead + serialized) of mutation
      * @return the created Allocation object
-     * @throws WriteTimeoutException If segment disallows CDC mutations, we 
throw WTE
+     * @throws CDCWriteException If segment disallows CDC mutations, we throw
      */
     @Override
-    public CommitLogSegment.Allocation allocate(Mutation mutation, int size) 
throws WriteTimeoutException
+    public CommitLogSegment.Allocation allocate(Mutation mutation, int size) 
throws CDCWriteException
     {
         CommitLogSegment segment = allocatingFrom();
         CommitLogSegment.Allocation alloc;
@@ -113,44 +121,46 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         return alloc;
     }
 
-    private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) 
throws WriteTimeoutException
+    private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) 
throws CDCWriteException
     {
         if (mutation.trackedByCDC() && segment.getCDCState() == 
CDCState.FORBIDDEN)
         {
             cdcSizeTracker.submitOverflowSizeRecalculation();
+            String logMsg = String.format("Rejecting mutation to keyspace %s. 
Free up space in %s by processing CDC logs.",
+                mutation.getKeyspaceName(), 
DatabaseDescriptor.getCDCLogLocation());
             NoSpamLogger.log(logger,
                              NoSpamLogger.Level.WARN,
                              10,
                              TimeUnit.SECONDS,
-                             "Rejecting Mutation containing CDC-enabled table. 
Free up space in {}.",
-                             DatabaseDescriptor.getCDCLogLocation());
-            throw new WriteTimeoutException(WriteType.CDC, 
ConsistencyLevel.LOCAL_ONE, 0, 1);
+                             logMsg);
+            throw new CDCWriteException(logMsg);
         }
     }
 
     /**
-     * Move files to cdc_raw after replay, since recovery will flush to 
SSTable and these mutations won't be available
-     * in the CL subsystem otherwise.
-     */
-    void handleReplayedSegment(final File file)
-    {
-        logger.trace("Moving (Unopened) segment {} to cdc_raw directory after 
replay", file);
-        FileUtils.renameWithConfirm(file.getAbsolutePath(), 
DatabaseDescriptor.getCDCLogLocation() + File.separator + file.getName());
-        cdcSizeTracker.addFlushedSize(file.length());
-    }
-
-    /**
      * On segment creation, flag whether the segment should accept CDC 
mutations or not based on the total currently
      * allocated unflushed CDC segments and the contents of cdc_raw
      */
     public CommitLogSegment createSegment()
     {
         CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, 
this);
+
+        // Hard link file in cdc folder for realtime tracking
+        FileUtils.createHardLink(segment.logFile, segment.getCDCFile());
+
         cdcSizeTracker.processNewSegment(segment);
         return segment;
     }
 
     /**
+     * For use after replay when replayer hard-links / adds tracking of 
replayed segments
+     */
+    public void addCDCSize(long size)
+    {
+        cdcSizeTracker.addSize(size);
+    }
+
+    /**
      * Tracks total disk usage of CDC subsystem, defined by the summation of 
all unflushed CommitLogSegments with CDC
      * data in them and all segments archived into cdc_raw.
      *
@@ -162,7 +172,6 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / 
DatabaseDescriptor.getCDCDiskCheckInterval());
         private ExecutorService cdcSizeCalculationExecutor;
         private CommitLogSegmentManagerCDC segmentManager;
-        private volatile long unflushedCDCSize;
 
         // Used instead of size during walk to remove chance of over-allocation
         private volatile long sizeInProgress = 0;
@@ -179,7 +188,6 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         public void start()
         {
             size = 0;
-            unflushedCDCSize = 0;
             cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, 
TimeUnit.SECONDS, new SynchronousQueue<>(), new 
ThreadPoolExecutor.DiscardPolicy());
         }
 
@@ -202,7 +210,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
                                     ? CDCState.FORBIDDEN
                                     : CDCState.PERMITTED);
                 if (segment.getCDCState() == CDCState.PERMITTED)
-                    unflushedCDCSize += defaultSegmentSize();
+                    size += defaultSegmentSize();
             }
 
             // Take this opportunity to kick off a recalc to pick up any 
consumer file deletion.
@@ -218,7 +226,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
                 if (segment.getCDCState() == CDCState.CONTAINS)
                     size += segment.onDiskSize();
                 if (segment.getCDCState() != CDCState.FORBIDDEN)
-                    unflushedCDCSize -= defaultSegmentSize();
+                    size -= defaultSegmentSize();
             }
 
             // Take this opportunity to kick off a recalc to pick up any 
consumer file deletion.
@@ -278,19 +286,20 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
             return FileVisitResult.CONTINUE;
         }
 
-        private void addFlushedSize(long toAdd)
+
+        public void shutdown()
         {
-            size += toAdd;
+            cdcSizeCalculationExecutor.shutdown();
         }
 
-        private long totalCDCSizeOnDisk()
+        private void addSize(long toAdd)
         {
-            return unflushedCDCSize + size;
+            size += toAdd;
         }
 
-        public void shutdown()
+        private long totalCDCSizeOnDisk()
         {
-            cdcSizeCalculationExecutor.shutdown();
+            return size;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
index 86e886b..b9bd744 100644
--- 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
+++ 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
@@ -61,19 +61,7 @@ public class CommitLogSegmentManagerStandard extends 
AbstractCommitLogSegmentMan
         return alloc;
     }
 
-    /**
-     * Simply delete untracked segment files w/standard, as it'll be flushed 
to sstables during recovery
-     *
-     * @param file segment file that is no longer in use.
-     */
-    void handleReplayedSegment(final File file)
-    {
-        // (don't decrease managed size, since this was never a "live" segment)
-        logger.trace("(Unopened) segment {} is no longer needed and will be 
deleted now", file);
-        FileUtils.deleteWithConfirm(file);
-    }
-
-    public CommitLogSegment createSegment()
+   public CommitLogSegment createSegment()
     {
         return CommitLogSegment.createSegment(commitLog, this);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/exceptions/CDCWriteException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/CDCWriteException.java 
b/src/java/org/apache/cassandra/exceptions/CDCWriteException.java
new file mode 100644
index 0000000..d60c1d3
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/CDCWriteException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+public class CDCWriteException extends RequestExecutionException
+{
+    public CDCWriteException(String msg)
+    {
+        super(ExceptionCode.CDC_WRITE_FAILURE, msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java 
b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index 6ad0577..9324110 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -33,15 +33,16 @@ public enum ExceptionCode
     BAD_CREDENTIALS (0x0100),
 
     // 1xx: problem during request execution
-    UNAVAILABLE     (0x1000),
-    OVERLOADED      (0x1001),
-    IS_BOOTSTRAPPING(0x1002),
-    TRUNCATE_ERROR  (0x1003),
-    WRITE_TIMEOUT   (0x1100),
-    READ_TIMEOUT    (0x1200),
-    READ_FAILURE    (0x1300),
-    FUNCTION_FAILURE(0x1400),
-    WRITE_FAILURE   (0x1500),
+    UNAVAILABLE         (0x1000),
+    OVERLOADED          (0x1001),
+    IS_BOOTSTRAPPING    (0x1002),
+    TRUNCATE_ERROR      (0x1003),
+    WRITE_TIMEOUT       (0x1100),
+    READ_TIMEOUT        (0x1200),
+    READ_FAILURE        (0x1300),
+    FUNCTION_FAILURE    (0x1400),
+    WRITE_FAILURE       (0x1500),
+    CDC_WRITE_FAILURE   (0x1600),
 
     // 2xx: problem validating the request
     SYNTAX_ERROR    (0x2000),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index ac4b3dc..9163d56 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -151,6 +151,9 @@ public class ErrorMessage extends Message.Response
                 case CONFIG_ERROR:
                     te = new ConfigurationException(msg);
                     break;
+                case CDC_WRITE_FAILURE:
+                    te = new CDCWriteException(msg);
+                    break;
                 case ALREADY_EXISTS:
                     String ksName = CBUtil.readString(body);
                     String cfName = CBUtil.readString(body);
@@ -306,6 +309,8 @@ public class ErrorMessage extends Message.Response
                     return new WriteTimeoutException(wfe.writeType, 
wfe.consistency, wfe.received, wfe.blockFor);
                 case FUNCTION_FAILURE:
                     return new InvalidRequestException(msg.toString());
+                case CDC_WRITE_FAILURE:
+                    return new InvalidRequestException(msg.toString());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 97a7e7a..ead2a88 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -9,6 +9,8 @@ commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size_in_mb: 5
 commitlog_directory: build/test/cassandra/commitlog
+# commitlog_compression:
+# - class_name: LZ4Compressor
 cdc_raw_directory: build/test/cassandra/cdc_raw
 cdc_enabled: false
 hints_directory: build/test/cassandra/hints

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/conf/cdc.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cdc.yaml b/test/conf/cdc.yaml
index f79930a..8fb9427 100644
--- a/test/conf/cdc.yaml
+++ b/test/conf/cdc.yaml
@@ -1 +1,4 @@
 cdc_enabled: true
+# Compression enabled since uncompressed + cdc isn't compatible w/Windows
+commitlog_compression:
+  - class_name: LZ4Compressor

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java 
b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
new file mode 100644
index 0000000..3695da8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+
+/**
+ * Utility class that flags the replayer as having seen a CDC mutation and 
calculates offset but doesn't apply mutations
+ */
+public class CDCTestReplayer extends CommitLogReplayer
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CDCTestReplayer.class);
+
+    public CDCTestReplayer() throws IOException
+    {
+        super(CommitLog.instance, CommitLogPosition.NONE, null, 
ReplayFilter.create());
+        CommitLog.instance.sync();
+        commitLogReader = new CommitLogTestReader();
+    }
+
+    public void examineCommitLog() throws IOException
+    {
+        replayFiles(new 
File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
+    }
+
+    private class CommitLogTestReader extends CommitLogReader
+    {
+        @Override
+        protected void readMutation(CommitLogReadHandler handler,
+                                    byte[] inputBuffer,
+                                    int size,
+                                    CommitLogPosition minPosition,
+                                    final int entryLocation,
+                                    final CommitLogDescriptor desc) throws 
IOException
+        {
+            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, 
size);
+            Mutation mutation;
+            try
+            {
+                mutation = Mutation.serializer.deserialize(bufIn, 
desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+                if (mutation.trackedByCDC())
+                    sawCDCMutation = true;
+            }
+            catch (IOException e)
+            {
+                // Test fails.
+                throw new AssertionError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e9da8572/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 3ae1ae4..80dfd01 100644
--- 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.cassandra.db.commitlog;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.Random;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
 
 import org.junit.Assert;
 import org.junit.Assume;
@@ -29,18 +30,18 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.exceptions.CDCWriteException;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.TableMetadata;
 
 public class CommitLogSegmentManagerCDCTest extends CQLTester
 {
-    private static Random random = new Random();
+    private static final Random random = new Random();
 
     @BeforeClass
     public static void checkConfig()
@@ -49,15 +50,17 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
     }
 
     @Before
-    public void before() throws IOException
+    public void beforeTest() throws Throwable
     {
-        CommitLog.instance.resetUnsafe(true);
-        for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
-            FileUtils.deleteWithConfirm(f);
+        super.beforeTest();
+        // Need to clean out any files from previous test runs. Prevents flaky 
test failures.
+        CommitLog.instance.stopUnsafe(true);
+        CommitLog.instance.start();
+        
((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).updateCDCTotalSize();
     }
 
     @Test
-    public void testCDCWriteTimeout() throws Throwable
+    public void testCDCWriteFailure() throws Throwable
     {
         createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=true;");
         CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
@@ -68,7 +71,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
         try
         {
             DatabaseDescriptor.setCDCSpaceInMB(32);
-            // Spin until we hit CDC capacity and make sure we get a 
WriteTimeout
+            // Spin until we hit CDC capacity and make sure we get a 
CDCWriteException
             try
             {
                 // Should trigger on anything < 20:1 compression ratio during 
compressed test
@@ -78,9 +81,9 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
                         .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
                         .build().apply();
                 }
-                Assert.fail("Expected WriteTimeoutException from full CDC but 
did not receive it.");
+                Assert.fail("Expected CDCWriteException from full CDC but did 
not receive it.");
             }
-            catch (WriteTimeoutException e)
+            catch (CDCWriteException e)
             {
                 // expected, do nothing
             }
@@ -111,45 +114,6 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
     }
 
     @Test
-    public void testCLSMCDCDiscardLogic() throws Throwable
-    {
-        CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
-
-        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=false;");
-        for (int i = 0; i < 8; i++)
-        {
-            new RowUpdateBuilder(currentTableMetadata(), 0, i)
-                .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) // fit 3 in 
a segment
-                .build().apply();
-        }
-
-        // Should have 4 segments CDC since we haven't flushed yet, 3 
PERMITTED, one of which is active, and 1 PERMITTED, in waiting
-        Assert.assertEquals(4 * DatabaseDescriptor.getCommitLogSegmentSize(), 
cdcMgr.updateCDCTotalSize());
-        expectCurrentCDCState(CDCState.PERMITTED);
-        CommitLog.instance.forceRecycleAllSegments();
-
-        // on flush, these PERMITTED should be deleted
-        Assert.assertEquals(0, new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
-
-        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=true;");
-        for (int i = 0; i < 8; i++)
-        {
-            new RowUpdateBuilder(currentTableMetadata(), 0, i)
-                .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
-                .build().apply();
-        }
-        // 4 total again, 3 CONTAINS, 1 in waiting PERMITTED
-        Assert.assertEquals(4 * DatabaseDescriptor.getCommitLogSegmentSize(), 
cdcMgr.updateCDCTotalSize());
-        CommitLog.instance.forceRecycleAllSegments();
-        expectCurrentCDCState(CDCState.PERMITTED);
-
-        // On flush, PERMITTED is deleted, CONTAINS is preserved.
-        cdcMgr.awaitManagementTasksCompletion();
-        int seen = getCDCRawCount();
-        Assert.assertTrue("Expected >3 files in cdc_raw, saw: " + seen, seen 
>= 3);
-    }
-
-    @Test
     public void testSegmentFlaggingOnCreation() throws Throwable
     {
         CommitLogSegmentManagerCDC cdcMgr = 
(CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager;
@@ -160,7 +124,7 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
         {
             DatabaseDescriptor.setCDCSpaceInMB(16);
             TableMetadata ccfm = 
Keyspace.open(keyspace()).getColumnFamilyStore(ct).metadata();
-            // Spin until we hit CDC capacity and make sure we get a 
WriteTimeout
+            // Spin until we hit CDC capacity and make sure we get a 
CDCWriteException
             try
             {
                 for (int i = 0; i < 1000; i++)
@@ -169,15 +133,17 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
                         .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
                         .build().apply();
                 }
-                Assert.fail("Expected WriteTimeoutException from full CDC but 
did not receive it.");
+                Assert.fail("Expected CDCWriteException from full CDC but did 
not receive it.");
             }
-            catch (WriteTimeoutException e) { }
+            catch (CDCWriteException e) { }
 
             expectCurrentCDCState(CDCState.FORBIDDEN);
             CommitLog.instance.forceRecycleAllSegments();
 
             cdcMgr.awaitManagementTasksCompletion();
-            new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles()[0].delete();
+            // Delete all files in cdc_raw
+            for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+                f.delete();
             cdcMgr.updateCDCTotalSize();
             // Confirm cdc update process changes flag on active segment
             expectCurrentCDCState(CDCState.PERMITTED);
@@ -186,12 +152,6 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
             for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles()) {
                 FileUtils.deleteWithConfirm(f);
             }
-
-            // Set space to 0, confirm newly allocated segments are FORBIDDEN
-            DatabaseDescriptor.setCDCSpaceInMB(0);
-            CommitLog.instance.forceRecycleAllSegments();
-            CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
-            expectCurrentCDCState(CDCState.FORBIDDEN);
         }
         finally
         {
@@ -199,6 +159,259 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
         }
     }
 
+    @Test
+    public void testCDCIndexFileWriteOnSync() throws IOException
+    {
+        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=true;");
+        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+            .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+            .build().apply();
+
+        CommitLog.instance.sync();
+        CommitLogSegment currentSegment = 
CommitLog.instance.segmentManager.allocatingFrom();
+        int syncOffset = currentSegment.lastSyncedOffset;
+
+        // Confirm index file is written
+        File cdcIndexFile = currentSegment.getCDCIndexFile();
+        Assert.assertTrue("Index file not written: " + cdcIndexFile, 
cdcIndexFile.exists());
+
+        // Read index value and confirm it's == end from last sync
+        BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
+        String input = in.readLine();
+        Integer offset = Integer.parseInt(input);
+        Assert.assertEquals(syncOffset, (long)offset);
+        in.close();
+    }
+
+    @Test
+    public void testCompletedFlag() throws IOException
+    {
+        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=true;");
+        CommitLogSegment initialSegment = 
CommitLog.instance.segmentManager.allocatingFrom();
+        DatabaseDescriptor.setCDCSpaceInMB(8);
+        try
+        {
+            for (int i = 0; i < 1000; i++)
+            {
+                new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+                .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                .build().apply();
+            }
+        }
+        catch (CDCWriteException ce)
+        {
+            // pass. Expected since we'll have a file or two linked on restart 
of CommitLog due to replay
+        }
+
+        CommitLog.instance.forceRecycleAllSegments();
+
+        // Confirm index file is written
+        File cdcIndexFile = initialSegment.getCDCIndexFile();
+        Assert.assertTrue("Index file not written: " + cdcIndexFile, 
cdcIndexFile.exists());
+
+        // Read index file and confirm second line is COMPLETED
+        BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
+        String input = in.readLine();
+        input = in.readLine();
+        Assert.assertTrue("Expected COMPLETED in index file, got: " + input, 
input.equals("COMPLETED"));
+        in.close();
+    }
+
+    @Test
+    public void testDeleteLinkOnDiscardNoCDC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=false;");
+        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+            .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+            .build().apply();
+        CommitLogSegment currentSegment = 
CommitLog.instance.segmentManager.allocatingFrom();
+
+        // Confirm that, with no CDC data present, we've hard-linked but have 
no index file
+        Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), 
currentSegment.logFile.getName()).toPath();
+        File cdcIndexFile = currentSegment.getCDCIndexFile();
+        Assert.assertTrue("File does not exist: " + linked, 
Files.exists(linked));
+        Assert.assertFalse("Expected index file to not be created but found: " 
+ cdcIndexFile, cdcIndexFile.exists());
+
+        // Sync and confirm no index written as index is written on flush
+        CommitLog.instance.sync();
+        Assert.assertTrue("File does not exist: " + linked, 
Files.exists(linked));
+        Assert.assertFalse("Expected index file to not be created but found: " 
+ cdcIndexFile, cdcIndexFile.exists());
+
+        // Force a full recycle and confirm hard-link is deleted
+        CommitLog.instance.forceRecycleAllSegments();
+        CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+        Assert.assertFalse("Expected hard link to CLS to be deleted on non-cdc 
segment: " + linked, Files.exists(linked));
+    }
+
+    @Test
+    public void testRetainLinkOnDiscardCDC() throws Throwable
+    {
+        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) 
WITH cdc=true;");
+        CommitLogSegment currentSegment = 
CommitLog.instance.segmentManager.allocatingFrom();
+        File cdcIndexFile = currentSegment.getCDCIndexFile();
+        Assert.assertFalse("Expected no index file before flush but found: " + 
cdcIndexFile, cdcIndexFile.exists());
+
+        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+            .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+            .build().apply();
+
+        Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), 
currentSegment.logFile.getName()).toPath();
+        // Confirm that, with CDC data present but not yet flushed, we've 
hard-linked but have no index file
+        Assert.assertTrue("File does not exist: " + linked, 
Files.exists(linked));
+
+        // Sync and confirm index written as index is written on flush
+        CommitLog.instance.sync();
+        Assert.assertTrue("File does not exist: " + linked, 
Files.exists(linked));
+        Assert.assertTrue("Expected cdc index file after flush but found none: 
" + cdcIndexFile, cdcIndexFile.exists());
+
+        // Force a full recycle and confirm all files remain
+        CommitLog.instance.forceRecycleAllSegments();
+        Assert.assertTrue("File does not exist: " + linked, 
Files.exists(linked));
+        Assert.assertTrue("Expected cdc index file after recycle but found 
none: " + cdcIndexFile, cdcIndexFile.exists());
+    }
+
+    @Test
+    public void testReplayLogic() throws IOException
+    {
+        // Assert.assertEquals(0, new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
+        String table_name = createTable("CREATE TABLE %s (idx int, data text, 
primary key(idx)) WITH cdc=true;");
+
+        DatabaseDescriptor.setCDCSpaceInMB(8);
+        TableMetadata ccfm = 
Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata();
+        try
+        {
+            for (int i = 0; i < 1000; i++)
+            {
+                new RowUpdateBuilder(ccfm, 0, i)
+                    .add("data", 
randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+                    .build().apply();
+            }
+            Assert.fail("Expected CDCWriteException from full CDC but did not 
receive it.");
+        }
+        catch (CDCWriteException e)
+        {
+            // pass
+        }
+
+        CommitLog.instance.sync();
+        CommitLog.instance.stopUnsafe(false);
+
+        // Build up a list of expected index files after replay and then clear 
out cdc_raw
+        List<CDCIndexData> oldData = parseCDCIndexData();
+        for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+            FileUtils.deleteWithConfirm(f.getAbsolutePath());
+
+        try
+        {
+            Assert.assertEquals("Expected 0 files in CDC folder after 
deletion. ",
+                                0, new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
+        }
+        finally
+        {
+            // If we don't have a started commitlog, assertions will cause the 
test to hang. I assume it's some assumption
+            // hang in the shutdown on CQLTester trying to clean up / drop 
keyspaces / tables and hanging applying
+            // mutations.
+            CommitLog.instance.start();
+            CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+        }
+        CDCTestReplayer replayer = new CDCTestReplayer();
+        replayer.examineCommitLog();
+
+        // Rough sanity check -> should be files there now.
+        Assert.assertTrue("Expected non-zero number of files in CDC folder 
after restart.",
+                          new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length > 0);
+
+        // Confirm all the old indexes in old are present and >= the original 
offset, as we flag the entire segment
+        // as cdc written on a replay.
+        List<CDCIndexData> newData = parseCDCIndexData();
+        for (CDCIndexData cid : oldData)
+        {
+            boolean found = false;
+            for (CDCIndexData ncid : newData)
+            {
+                if (cid.fileName.equals(ncid.fileName))
+                {
+                    Assert.assertTrue("New CDC index file expected to have >= 
offset in old.", ncid.offset >= cid.offset);
+                    found = true;
+                }
+            }
+            if (!found)
+            {
+                StringBuilder errorMessage = new StringBuilder();
+                errorMessage.append(String.format("Missing old CDCIndexData in 
new set after replay: %s\n", cid));
+                errorMessage.append("List of CDCIndexData in new set of 
indexes after replay:\n");
+                for (CDCIndexData ncid : newData)
+                    errorMessage.append(String.format("   %s\n", ncid));
+                Assert.fail(errorMessage.toString());
+            }
+        }
+
+        // And make sure we don't have new CDC Indexes we don't expect
+        for (CDCIndexData ncid : newData)
+        {
+            boolean found = false;
+            for (CDCIndexData cid : oldData)
+            {
+                if (cid.fileName.equals(ncid.fileName))
+                    found = true;
+            }
+            if (!found)
+                Assert.fail(String.format("Unexpected new CDCIndexData found 
after replay: %s\n", ncid));
+        }
+    }
+
+    private List<CDCIndexData> parseCDCIndexData()
+    {
+        List<CDCIndexData> results = new ArrayList<>();
+        try
+        {
+            for (File f : new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+            {
+                if (f.getName().contains("_cdc.idx"))
+                    results.add(new CDCIndexData(f));
+            }
+        }
+        catch (IOException e)
+        {
+            Assert.fail(String.format("Failed to parse CDCIndexData: %s", 
e.getMessage()));
+        }
+        return results;
+    }
+
+    private static class CDCIndexData
+    {
+        private final String fileName;
+        private final int offset;
+
+        CDCIndexData(File f) throws IOException
+        {
+            String line = "";
+            try (BufferedReader br = new BufferedReader(new 
InputStreamReader(new FileInputStream(f))))
+            {
+                line = br.readLine();
+            }
+            catch (Exception e)
+            {
+                throw e;
+            }
+            fileName = f.getName();
+            offset = Integer.parseInt(line);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s,%d", fileName, offset);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            CDCIndexData cid = (CDCIndexData)other;
+            return fileName.equals(cid.fileName) && offset == cid.offset;
+        }
+    }
+
     private ByteBuffer randomizeBuffer(int size)
     {
         byte[] toWrap = new byte[size];
@@ -211,9 +424,15 @@ public class CommitLogSegmentManagerCDCTest extends 
CQLTester
         return new 
File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length;
     }
 
-    private void expectCurrentCDCState(CDCState state)
+    private void expectCurrentCDCState(CDCState expectedState)
     {
-        Assert.assertEquals("Received unexpected CDCState on current 
allocatingFrom segment.",
-            state, 
CommitLog.instance.segmentManager.allocatingFrom().getCDCState());
+        CDCState currentState = 
CommitLog.instance.segmentManager.allocatingFrom().getCDCState();
+        if (currentState != expectedState)
+        {
+            logger.error("expectCurrentCDCState violation! Expected state: {}. 
Found state: {}. Current CDC allocation: {}",
+                         expectedState, currentState, 
((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).updateCDCTotalSize());
+            Assert.fail(String.format("Received unexpected CDCState on current 
allocatingFrom segment. Expected: %s. Received: %s",
+                        expectedState, currentState));
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to