Add Change Data Capture

Patch by jmckenzie; reviewed by cyeksigian and blambov for CASSANDRA-8844


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e31e2162
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e31e2162
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e31e2162

Branch: refs/heads/trunk
Commit: e31e216234c6b57a531cae607e0355666007deb2
Parents: ed538f9
Author: Josh McKenzie <jmcken...@apache.org>
Authored: Sun Mar 27 09:20:47 2016 -0400
Committer: Josh McKenzie <jmcken...@apache.org>
Committed: Thu Jun 16 10:01:39 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  32 +-
 build.xml                                       |  51 +-
 conf/cassandra.yaml                             |  26 +
 pylib/cqlshlib/cql3handling.py                  |   5 +-
 src/antlr/Parser.g                              |   3 +-
 .../org/apache/cassandra/config/Config.java     |   6 +
 .../cassandra/config/DatabaseDescriptor.java    |  86 ++-
 .../statements/CreateKeyspaceStatement.java     |   1 +
 .../cql3/statements/DropKeyspaceStatement.java  |   2 +-
 .../cql3/statements/TableAttributes.java        |   3 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  67 +--
 .../org/apache/cassandra/db/Directories.java    |  51 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |  17 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  47 +-
 src/java/org/apache/cassandra/db/Mutation.java  |  18 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  26 +-
 src/java/org/apache/cassandra/db/WriteType.java |   3 +-
 .../AbstractCommitLogSegmentManager.java        | 584 +++++++++++++++++++
 .../db/commitlog/AbstractCommitLogService.java  |   3 +-
 .../cassandra/db/commitlog/CommitLog.java       | 157 +++--
 .../db/commitlog/CommitLogPosition.java         | 121 ++++
 .../db/commitlog/CommitLogReadHandler.java      |  76 +++
 .../cassandra/db/commitlog/CommitLogReader.java | 501 ++++++++++++++++
 .../db/commitlog/CommitLogReplayer.java         | 582 +++++++-----------
 .../db/commitlog/CommitLogSegment.java          | 110 ++--
 .../db/commitlog/CommitLogSegmentManager.java   | 567 ------------------
 .../commitlog/CommitLogSegmentManagerCDC.java   | 302 ++++++++++
 .../CommitLogSegmentManagerStandard.java        |  89 +++
 .../db/commitlog/CommitLogSegmentReader.java    | 366 ++++++++++++
 .../db/commitlog/CompressedSegment.java         |  12 +-
 .../db/commitlog/EncryptedSegment.java          |  18 +-
 .../db/commitlog/FileDirectSegment.java         |  73 +--
 .../db/commitlog/MemoryMappedSegment.java       |   6 +-
 .../cassandra/db/commitlog/ReplayPosition.java  | 178 ------
 .../cassandra/db/commitlog/SegmentReader.java   | 355 -----------
 .../db/commitlog/SimpleCachedBufferPool.java    | 118 ++++
 .../apache/cassandra/db/lifecycle/Tracker.java  |   8 +-
 .../apache/cassandra/db/view/TableViews.java    |   4 +-
 .../apache/cassandra/db/view/ViewManager.java   |   2 -
 .../io/sstable/format/SSTableReader.java        |   1 -
 .../metadata/LegacyMetadataSerializer.java      |  12 +-
 .../io/sstable/metadata/MetadataCollector.java  |  16 +-
 .../io/sstable/metadata/StatsMetadata.java      |  24 +-
 .../cassandra/metrics/CommitLogMetrics.java     |   9 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |   6 +-
 .../apache/cassandra/schema/TableParams.java    |  23 +-
 .../cassandra/service/CassandraDaemon.java      |   4 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  36 +-
 .../utils/DirectorySizeCalculator.java          |  98 ++++
 .../cassandra/utils/JVMStabilityInspector.java  |   3 +-
 .../cassandra/utils/memory/BufferPool.java      |   2 +-
 test/conf/cassandra-murmur.yaml                 |   2 +
 test/conf/cassandra.yaml                        |   2 +
 test/conf/cdc.yaml                              |   1 +
 test/data/bloom-filter/ka/foo.cql               |   2 +-
 .../db/commitlog/CommitLogStressTest.java       | 123 ++--
 .../test/microbench/DirectorySizerBench.java    | 105 ++++
 .../OffsetAwareConfigurationLoader.java         |  13 +-
 .../cassandra/batchlog/BatchlogManagerTest.java |   4 +-
 .../apache/cassandra/cql3/CDCStatementTest.java |  50 ++
 .../org/apache/cassandra/cql3/CQLTester.java    |   4 +
 .../apache/cassandra/cql3/OutOfSpaceTest.java   |   2 +-
 .../cql3/validation/operations/CreateTest.java  |   5 +-
 .../apache/cassandra/db/ReadMessageTest.java    |  10 +-
 .../db/commitlog/CommitLogReaderTest.java       | 267 +++++++++
 .../CommitLogSegmentManagerCDCTest.java         | 220 +++++++
 .../commitlog/CommitLogSegmentManagerTest.java  |  23 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 130 +++--
 .../db/commitlog/CommitLogTestReplayer.java     |  59 +-
 .../db/commitlog/CommitLogUpgradeTest.java      |  18 +-
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   4 +-
 .../db/commitlog/SegmentReaderTest.java         |   6 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  18 +-
 .../cassandra/hints/HintsEncryptionTest.java    |   2 +-
 .../metadata/MetadataSerializerTest.java        |   6 +-
 .../apache/cassandra/utils/KillerForTests.java  |  16 +
 77 files changed, 3907 insertions(+), 2096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9fa673..9c44a63 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Faster streaming (CASSANDRA-9766)
  * Add prepared query parameter to trace for "Execute CQL3 prepared query" 
session (CASSANDRA-11425)
  * Add repaired percentage metric (CASSANDRA-11503)
+ * Add Change-Data-Capture (CASSANDRA-8844)
 Merged from 3.0:
  * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
 Merged from 2.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index f9430ac..aa2612d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -30,13 +30,13 @@ New features
    - TimeWindowCompactionStrategy has been added. This has proven to be a 
better approach
      to time series compaction and new tables should use this instead of DTCS. 
See
      CASSANDRA-9666 for details.
-
-Deprecation
------------
-   - DateTieredCompactionStrategy has been deprecated - new tables should use
-     TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table 
to TWCS might
-     cause increased compaction load for a while after the migration so make 
sure you run
-     tests before migrating. Read CASSANDRA-9666 for background on this.
+   - Change-Data-Capture is now available. See cassandra.yaml and for 
cdc-specific flags and
+     a brief explanation of on-disk locations for archived data in CommitLog 
form. This can
+     be enabled via ALTER TABLE ... WITH cdc=true.
+     Upon flush, CommitLogSegments containing data for CDC-enabled tables are 
moved to
+     the data/cdc_raw directory until removed by the user and writes to 
CDC-enabled tables
+     will be rejected with a WriteTimeoutException once cdc_total_space_in_mb 
is reached
+     between unflushed CommitLogSegments and cdc_raw.
 
 Upgrading
 ---------
@@ -47,6 +47,12 @@ Upgrading
       drop the old versions, and this _before_ upgrade (see CASSANDRA-10783 
for more
       details).
 
+Deprecation
+-----------
+   - DateTieredCompactionStrategy has been deprecated - new tables should use
+     TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table 
to TWCS might
+     cause increased compaction load for a while after the migration so make 
sure you run
+     tests before migrating. Read CASSANDRA-9666 for background on this.
 
 3.7
 ===
@@ -59,6 +65,7 @@ Upgrading
      value of native_transport_max_frame_size_in_mb. SSTables will be 
considered corrupt if
      they contain values whose size exceeds this limit. See CASSANDRA-9530 for 
more details.
 
+
 3.6
 =====
 
@@ -89,6 +96,7 @@ New features
    - Filtering expressions are made more pluggable and can be added 
programatically via
      a QueryHandler implementation. See CASSANDRA-11295 for more details.
 
+
 3.4
 =====
 
@@ -366,8 +374,8 @@ Upgrading
      the legacy tables, so clients experience no disruption. Issuing DCL
      statements during an upgrade is not supported.
      Once all nodes are upgraded, an operator with superuser privileges should
-     drop the legacy tables, system_auth.users, system_auth.credentials and 
-     system_auth.permissions. Doing so will prompt Cassandra to switch over to 
+     drop the legacy tables, system_auth.users, system_auth.credentials and
+     system_auth.permissions. Doing so will prompt Cassandra to switch over to
      the new tables without requiring any further intervention.
      While the legacy tables are present a restarted node will re-run the data
      conversion and report the outcome so that operators can verify that it is
@@ -536,8 +544,8 @@ Upgrading
     - cqlsh will now display timestamps with a UTC timezone. Previously,
       timestamps were displayed with the local timezone.
     - Commit log files are no longer recycled by default, due to negative
-      performance implications. This can be enabled again with the 
-      commitlog_segment_recycling option in your cassandra.yaml 
+      performance implications. This can be enabled again with the
+      commitlog_segment_recycling option in your cassandra.yaml
     - JMX methods set/getCompactionStrategyClass have been deprecated, use
       set/getCompactionParameters/set/getCompactionParametersJson instead
 
@@ -666,7 +674,7 @@ New features
 Upgrading
 ---------
    - commitlog_sync_batch_window_in_ms behavior has changed from the
-     maximum time to wait between fsync to the minimum time.  We are 
+     maximum time to wait between fsync to the minimum time.  We are
      working on making this more user-friendly (see CASSANDRA-9533) but in the
      meantime, this means 2.1 needs a much smaller batch window to keep
      writer threads from starving.  The suggested default is now 2ms.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index aa55809..b01e499 100644
--- a/build.xml
+++ b/build.xml
@@ -1240,9 +1240,20 @@
             <filelist dir="@{inputdir}" files="@{filelist}"/>
         </batchtest>
       </junit>
-      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/commitlog:@{poffset}"/>
-      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/data:@{poffset}"/>
-      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/saved_caches:@{poffset}"/>
+
+      <condition property="fileSep" value=";">
+        <os family="windows"/>
+      </condition>
+      <condition property="fileSep" else=":">
+        <isset property="fileSep"/>
+      </condition>
+      <fail unless="fileSep">Failed to set File Separator. This shouldn't 
happen.</fail>
+
+      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/commitlog${fileSep}@{poffset}"/>
+      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/cdc_raw${fileSep}@{poffset}"/>
+      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/data${fileSep}@{poffset}"/>
+      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/saved_caches${fileSep}@{poffset}"/>
+      <delete quiet="true" failonerror="false" 
dir="${build.test.dir}/cassandra/hints${fileSep}@{poffset}"/>
     </sequential>
   </macrodef>
 
@@ -1337,6 +1348,25 @@
     </sequential>
   </macrodef>
 
+  <macrodef name="testlist-cdc">
+    <attribute name="test.file.list" />
+    <attribute name="testlist.offset" />
+    <sequential>
+      <property name="cdc_yaml" value="${build.test.dir}/cassandra.cdc.yaml"/>
+      <testmacrohelper inputdir="${test.unit.src}" 
filelist="@{test.file.list}" poffset="@{testlist.offset}"
+                       exclude="**/*.java" timeout="${test.timeout}" 
testtag="cdc">
+        <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+        <jvmarg 
value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
+        <jvmarg 
value="-Dmigration-sstable-root=${test.data}/migration-sstables"/>
+        <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
+        <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
+        <jvmarg value="-Dcassandra.config=file:///${cdc_yaml}"/>
+        <jvmarg value="-Dcassandra.skip_sync=true" />
+        <jvmarg 
value="-Dcassandra.config.loader=org.apache.cassandra.OffsetAwareConfigurationLoader"/>
+      </testmacrohelper>
+    </sequential>
+  </macrodef>
+
   <!--
     Run named ant task with jacoco, such as "ant jacoco-run -Dtaskname=test"
     the target run must enable the jacoco agent if usejacoco is 'yes' -->
@@ -1377,13 +1407,26 @@
     <testparallel testdelegate="testlist-compression" />
   </target>
 
+  <target name="test-cdc" depends="build-test" description="Execute unit tests 
with change-data-capture enabled">
+    <property name="cdc_yaml" value="${build.test.dir}/cassandra.cdc.yaml"/>
+    <concat destfile="${cdc_yaml}">
+      <fileset file="${test.conf}/cassandra.yaml"/>
+      <fileset file="${test.conf}/cdc.yaml"/>
+    </concat>
+    <path id="all-test-classes-path">
+      <fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
+    </path>
+    <property name="all-test-classes" refid="all-test-classes-path"/>
+    <testparallel testdelegate="testlist-cdc" />
+  </target>
+
   <target name="msg-ser-gen-test" depends="build-test" description="Generates 
message serializations">
     <testmacro inputdir="${test.unit.src}"
         timeout="${test.timeout}" filter="**/SerializationsTest.java">
       <jvmarg value="-Dcassandra.test-serialization-writes=True"/>
     </testmacro>
   </target>
-  
+
   <target name="msg-ser-test" depends="build-test" description="Tests message 
serializations">
       <testmacro inputdir="${test.unit.src}" timeout="${test.timeout}"
                filter="**/SerializationsTest.java"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index dcd5278..1068508 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -193,6 +193,17 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # If not set, the default directory is $CASSANDRA_HOME/data/commitlog.
 # commitlog_directory: /var/lib/cassandra/commitlog
 
+# Enable / disable CDC functionality on a per-node basis. This modifies the 
logic used
+# for write path allocation rejection (standard: never reject. cdc: reject 
Mutation
+# containing a CDC-enabled table if at space limit in cdc_raw_directory).
+cdc_enabled: false
+
+# CommitLogSegments are moved to this directory on flush if cdc_enabled: true 
and the
+# segment contains mutations for a CDC-enabled table. This should be placed on 
a
+# separate spindle than the data directories. If not set, the default 
directory is
+# $CASSANDRA_HOME/data/cdc_raw.
+# cdc_raw_directory: /var/lib/cassandra/cdc_raw
+
 # policy for data disk failures:
 # die: shut down gossip and client transports and kill the JVM for any fs 
errors or
 #      single-sstable errors, so the node can be replaced.
@@ -474,6 +485,21 @@ memtable_allocation_type: heap_buffers
 # avoid having memtable_flush_writers * data_file_directories > number of cores
 #memtable_flush_writers: 1
 
+# Total space to use for change-data-capture logs on disk.
+#
+# If space gets above this value, Cassandra will throw WriteTimeoutException
+# on Mutations including tables with CDC enabled. A CDCCompactor is responsible
+# for parsing the raw CDC logs and deleting them when parsing is completed.
+#
+# The default value is the min of 4096 mb and 1/8th of the total space
+# of the drive where cdc_raw_directory resides.
+# cdc_total_space_in_mb: 4096
+
+# When we hit our cdc_raw limit and the CDCCompactor is either running behind
+# or experiencing backpressure, we check at the following interval to see if 
any
+# new space for cdc-tracked tables has been made available. Default to 250ms
+# cdc_free_space_check_interval_ms: 250
+
 # A fixed memory pool size in MB for for SSTable index summaries. If left
 # empty, this will default to 5% of the heap size. If the memory usage of
 # all index summaries exceeds this limit, SSTables with low read rates will

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 31a8459..70e12d4 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -51,6 +51,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
         ('default_time_to_live', None),
         ('speculative_retry', None),
         ('memtable_flush_period_in_ms', None),
+        ('cdc', None)
     )
 
     columnfamily_layout_map_options = (
@@ -478,6 +479,8 @@ def cf_prop_val_completer(ctxt, cass):
     if this_opt in ('min_compaction_threshold', 'max_compaction_threshold',
                     'gc_grace_seconds', 'min_index_interval', 
'max_index_interval'):
         return [Hint('<integer>')]
+    if this_opt in ('cdc'):
+        return [Hint('<true|false>')]
     return [Hint('<option_value>')]
 
 
@@ -1125,7 +1128,7 @@ syntax_rules += r'''
                                 ;
 
 <cfamProperty> ::= <property>
-                 | "COMPACT" "STORAGE"
+                 | "COMPACT" "STORAGE" "CDC"
                  | "CLUSTERING" "ORDER" "BY" "(" <cfamOrdering>
                                                  ( "," <cfamOrdering> )* ")"
                  ;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/antlr/Parser.g
----------------------------------------------------------------------
diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index cdb2263..f61f464 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -100,7 +100,7 @@ options {
         if (map == null || map.entries == null || map.entries.isEmpty())
             return Collections.<String, String>emptyMap();
 
-        Map<String, String> res = new HashMap<String, 
String>(map.entries.size());
+        Map<String, String> res = new HashMap<>(map.entries.size());
 
         for (Pair<Term.Raw, Term.Raw> entry : map.entries)
         {
@@ -762,7 +762,6 @@ alterKeyspaceStatement returns [AlterKeyspaceStatement expr]
         K_WITH properties[attrs] { $expr = new AlterKeyspaceStatement(ks, 
attrs); }
     ;
 
-
 /**
  * ALTER COLUMN FAMILY <CF> ALTER <column> TYPE <newtype>;
  * ALTER COLUMN FAMILY <CF> ADD <column> <newtype>; | ALTER COLUMN FAMILY <CF> 
ADD (<column> <newtype>,<column1> <newtype1>..... <column n> <newtype n>)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index c6fa4fe..c18d1e0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -198,6 +198,12 @@ public class Config
 
     public Integer max_mutation_size_in_kb;
 
+    // Change-data-capture logs
+    public Boolean cdc_enabled = false;
+    public String cdc_raw_directory;
+    public Integer cdc_total_space_in_mb;
+    public Integer cdc_free_space_check_interval_ms = 250;
+
     @Deprecated
     public int commitlog_periodic_queue_size = -1;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e348ec6..5b3e57d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -531,6 +531,14 @@ public class DatabaseDescriptor
             conf.hints_directory += File.separator + "hints";
         }
 
+        if (conf.cdc_raw_directory == null)
+        {
+            conf.cdc_raw_directory = 
System.getProperty("cassandra.storagedir", null);
+            if (conf.cdc_raw_directory == null)
+                throw new ConfigurationException("cdc_raw_directory is missing 
and -Dcassandra.storagedir is not set", false);
+            conf.cdc_raw_directory += File.separator + "cdc_raw";
+        }
+
         if (conf.commitlog_total_space_in_mb == null)
         {
             int preferredSize = 8192;
@@ -558,6 +566,38 @@ public class DatabaseDescriptor
             }
         }
 
+        if (conf.cdc_total_space_in_mb == null)
+        {
+            int preferredSize = 4096;
+            int minSize = 0;
+            try
+            {
+                // use 1/8th of available space.  See discussion on #10013 and 
#10199 on the CL, taking half that for CDC
+                minSize = 
Ints.checkedCast((guessFileStore(conf.cdc_raw_directory).getTotalSpace() / 
1048576) / 8);
+            }
+            catch (IOException e)
+            {
+                logger.debug("Error checking disk space", e);
+                throw new ConfigurationException(String.format("Unable to 
check disk space available to %s. Perhaps the Cassandra user does not have the 
necessary permissions",
+                                                               
conf.cdc_raw_directory), e);
+            }
+            if (minSize < preferredSize)
+            {
+                logger.warn("Small cdc volume detected at {}; setting 
cdc_total_space_in_mb to {}.  You can override this in cassandra.yaml",
+                            conf.cdc_raw_directory, minSize);
+                conf.cdc_total_space_in_mb = minSize;
+            }
+            else
+            {
+                conf.cdc_total_space_in_mb = preferredSize;
+            }
+        }
+
+        if (conf.cdc_enabled != null)
+        {
+            logger.info("cdc_enabled is true. Starting casssandra node with 
Change-Data-Capture enabled.");
+        }
+
         if (conf.saved_caches_directory == null)
         {
             conf.saved_caches_directory = 
System.getProperty("cassandra.storagedir", null);
@@ -946,6 +986,13 @@ public class DatabaseDescriptor
             if (conf.saved_caches_directory == null)
                 throw new ConfigurationException("saved_caches_directory must 
be specified", false);
             FileUtils.createDirectory(conf.saved_caches_directory);
+
+            if (conf.cdc_enabled)
+            {
+                if (conf.cdc_raw_directory == null)
+                    throw new ConfigurationException("cdc_raw_directory must 
be specified", false);
+                FileUtils.createDirectory(conf.cdc_raw_directory);
+            }
         }
         catch (ConfigurationException e)
         {
@@ -1349,6 +1396,12 @@ public class DatabaseDescriptor
         return conf.commitlog_directory;
     }
 
+    @VisibleForTesting
+    public static void setCommitLogLocation(String value)
+    {
+        conf.commitlog_directory = value;
+    }
+
     public static ParameterizedClass getCommitLogCompression()
     {
         return conf.commitlog_compression;
@@ -1359,7 +1412,12 @@ public class DatabaseDescriptor
         conf.commitlog_compression = compressor;
     }
 
-    public static int getCommitLogMaxCompressionBuffersInPool()
+   /**
+    * Maximum number of buffers in the compression pool. The default value is 
3, it should not be set lower than that
+    * (one segment in compression, one written to, one in reserve); delays in 
compression may cause the log to use
+    * more, depending on how soon the sync policy stops all writing threads.
+    */
+    public static int getCommitLogMaxCompressionBuffersPerPool()
     {
         return conf.commitlog_max_compression_buffers_in_pool;
     }
@@ -2121,6 +2179,32 @@ public class DatabaseDescriptor
         return conf.gc_warn_threshold_in_ms;
     }
 
+    public static boolean isCDCEnabled()
+    {
+        return conf.cdc_enabled;
+    }
+
+    public static String getCDCLogLocation()
+    {
+        return conf.cdc_raw_directory;
+    }
+
+    public static Integer getCDCSpaceInMB()
+    {
+        return conf.cdc_total_space_in_mb;
+    }
+
+    @VisibleForTesting
+    public static void setCDCSpaceInMB(Integer input)
+    {
+        conf.cdc_total_space_in_mb = input;
+    }
+
+    public static Integer getCDCDiskCheckInterval()
+    {
+        return conf.cdc_free_space_check_interval_ms;
+    }
+
     @VisibleForTesting
     public static void setEncryptionContext(EncryptionContext ec)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
index 86754b6..f88c04f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.util.regex.Pattern;
+
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
index 513ff1b..a08b193 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java 
b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
index c1a9d54..dee3385 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
@@ -131,6 +131,9 @@ public final class TableAttributes extends 
PropertyDefinitions
         if (hasOption(Option.CRC_CHECK_CHANCE))
             builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE));
 
+        if (hasOption(Option.CDC))
+            builder.cdc(getBoolean(Option.CDC.toString(), false));
+
         return builder.build();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a5d27bf..523e15f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
@@ -814,7 +814,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      *
      * @param memtable
      */
-    public ListenableFuture<ReplayPosition> switchMemtableIfCurrent(Memtable 
memtable)
+    public ListenableFuture<CommitLogPosition> 
switchMemtableIfCurrent(Memtable memtable)
     {
         synchronized (data)
         {
@@ -831,14 +831,14 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * not complete until the Memtable (and all prior Memtables) have been 
successfully flushed, and the CL
      * marked clean up to the position owned by the Memtable.
      */
-    public ListenableFuture<ReplayPosition> switchMemtable()
+    public ListenableFuture<CommitLogPosition> switchMemtable()
     {
         synchronized (data)
         {
             logFlush();
             Flush flush = new Flush(false);
             flushExecutor.execute(flush);
-            ListenableFutureTask<ReplayPosition> task = 
ListenableFutureTask.create(flush.postFlush);
+            ListenableFutureTask<CommitLogPosition> task = 
ListenableFutureTask.create(flush.postFlush);
             postFlushExecutor.submit(task);
             return task;
         }
@@ -881,7 +881,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * @return a Future yielding the commit log position that can be 
guaranteed to have been successfully written
      *         to sstables for this table once the future completes
      */
-    public ListenableFuture<ReplayPosition> forceFlush()
+    public ListenableFuture<CommitLogPosition> forceFlush()
     {
         synchronized (data)
         {
@@ -900,7 +900,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * @return a Future yielding the commit log position that can be 
guaranteed to have been successfully written
      *         to sstables for this table once the future completes
      */
-    public ListenableFuture<ReplayPosition> forceFlush(ReplayPosition 
flushIfDirtyBefore)
+    public ListenableFuture<?> forceFlush(CommitLogPosition flushIfDirtyBefore)
     {
         // we don't loop through the remaining memtables since here we only 
care about commit log dirtiness
         // and this does not vary between a table and its table-backed indexes
@@ -914,24 +914,20 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * @return a Future yielding the commit log position that can be 
guaranteed to have been successfully written
      *         to sstables for this table once the future completes
      */
-    private ListenableFuture<ReplayPosition> waitForFlushes()
+    private ListenableFuture<CommitLogPosition> waitForFlushes()
     {
         // we grab the current memtable; once any preceding memtables have 
flushed, we know its
         // commitLogLowerBound has been set (as this it is set with the upper 
bound of the preceding memtable)
         final Memtable current = data.getView().getCurrentMemtable();
-        ListenableFutureTask<ReplayPosition> task = 
ListenableFutureTask.create(new Callable<ReplayPosition>()
-        {
-            public ReplayPosition call()
-            {
-                logger.debug("forceFlush requested but everything is clean in 
{}", name);
-                return current.getCommitLogLowerBound();
-            }
+        ListenableFutureTask<CommitLogPosition> task = 
ListenableFutureTask.create(() -> {
+            logger.debug("forceFlush requested but everything is clean in {}", 
name);
+            return current.getCommitLogLowerBound();
         });
         postFlushExecutor.execute(task);
         return task;
     }
 
-    public ReplayPosition forceBlockingFlush()
+    public CommitLogPosition forceBlockingFlush()
     {
         return FBUtilities.waitOnFuture(forceFlush());
     }
@@ -940,18 +936,21 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * Both synchronises custom secondary indexes and provides ordering 
guarantees for futures on switchMemtable/flush
      * etc, which expect to be able to wait until the flush (and all prior 
flushes) requested have completed.
      */
-    private final class PostFlush implements Callable<ReplayPosition>
+    private final class PostFlush implements Callable<CommitLogPosition>
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
+        final CommitLogPosition commitLogUpperBound;
         volatile Throwable flushFailure = null;
-        final ReplayPosition commitLogUpperBound;
         final List<Memtable> memtables;
         final List<Collection<SSTableReader>> readers;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier, ReplayPosition commitLogUpperBound,
-                          List<Memtable> memtables, 
List<Collection<SSTableReader>> readers)
+        private PostFlush(boolean flushSecondaryIndexes,
+                          OpOrder.Barrier writeBarrier,
+                          CommitLogPosition commitLogUpperBound,
+                          List<Memtable> memtables,
+                          List<Collection<SSTableReader>> readers)
         {
             this.writeBarrier = writeBarrier;
             this.flushSecondaryIndexes = flushSecondaryIndexes;
@@ -960,7 +959,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             this.readers = readers;
         }
 
-        public ReplayPosition call()
+        public CommitLogPosition call()
         {
             if (discardFlushResults == ColumnFamilyStore.this)
                 return commitLogUpperBound;
@@ -988,6 +987,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 throw new IllegalStateException();
             }
 
+            // Must check commitLogUpperBound != null because Flush may find 
that all memtables are clean
+            // and so not set a commitLogUpperBound
             // If a flush errored out but the error was ignored, make sure we 
don't discard the commit log.
             if (flushFailure == null)
             {
@@ -1044,7 +1045,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             writeBarrier = keyspace.writeOrder.newBarrier();
 
             // submit flushes for the memtable for any indexed sub-cfses, and 
our own
-            AtomicReference<ReplayPosition> commitLogUpperBound = new 
AtomicReference<>();
+            AtomicReference<CommitLogPosition> commitLogUpperBound = new 
AtomicReference<>();
             for (ColumnFamilyStore cfs : concatWithIndexes())
             {
                 // switch all memtables, regardless of their dirty status, 
setting the barrier
@@ -1062,7 +1063,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             // we then issue the barrier; this lets us wait for all operations 
started prior to the barrier to complete;
             // since this happens after wiring up the commitLogUpperBound, we 
also know all operations with earlier
-            // replay positions have also completed, i.e. the memtables are 
done and ready to flush
+            // commit log segment position have also completed, i.e. the 
memtables are done and ready to flush
             writeBarrier.issue();
             postFlush = new PostFlush(!truncate, writeBarrier, 
commitLogUpperBound.get(), memtables, readers);
         }
@@ -1215,16 +1216,16 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     }
 
     // atomically set the upper bound for the commit log
-    private static void setCommitLogUpperBound(AtomicReference<ReplayPosition> 
commitLogUpperBound)
+    private static void 
setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound)
     {
         // we attempt to set the holder to the current commit log context. at 
the same time all writes to the memtables are
         // also maintaining this value, so if somebody sneaks ahead of us 
somehow (should be rare) we simply retry,
         // so that we know all operations prior to the position have not 
reached it yet
-        ReplayPosition lastReplayPosition;
+        CommitLogPosition lastReplayPosition;
         while (true)
         {
-            lastReplayPosition = new 
Memtable.LastReplayPosition(CommitLog.instance.getContext());
-            ReplayPosition currentLast = commitLogUpperBound.get();
+            lastReplayPosition = new 
Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition()));
+            CommitLogPosition currentLast = commitLogUpperBound.get();
             if ((currentLast == null || 
currentLast.compareTo(lastReplayPosition) <= 0)
                 && commitLogUpperBound.compareAndSet(currentLast, 
lastReplayPosition))
                 break;
@@ -1238,7 +1239,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public void simulateFailedFlush()
     {
         discardFlushResults = this;
-        data.markFlushing(data.switchMemtable(false, new Memtable(new 
AtomicReference<>(CommitLog.instance.getContext()), this)));
+        data.markFlushing(data.switchMemtable(false, new Memtable(new 
AtomicReference<>(CommitLog.instance.getCurrentPosition()), this)));
     }
 
     public void resumeFlushing()
@@ -1316,11 +1317,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
      */
-    public void apply(PartitionUpdate update, UpdateTransaction indexer, 
OpOrder.Group opGroup, ReplayPosition replayPosition)
+    public void apply(PartitionUpdate update, UpdateTransaction indexer, 
OpOrder.Group opGroup, CommitLogPosition commitLogPosition)
 
     {
         long start = System.nanoTime();
-        Memtable mt = data.getMemtableFor(opGroup, replayPosition);
+        Memtable mt = data.getMemtableFor(opGroup, commitLogPosition);
         long timeDelta = mt.put(update, indexer, opGroup);
         DecoratedKey key = update.partitionKey();
         invalidateCachedPartition(key);
@@ -2105,7 +2106,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         // recording the timestamp IN BETWEEN those actions. Any sstables 
created
         // with this timestamp or greater time, will not be marked for delete.
         //
-        // Bonus complication: since we store replay position in sstable 
metadata,
+        // Bonus complication: since we store commit log segment position in 
sstable metadata,
         // truncating those sstables means we will replay any CL segments from 
the
         // beginning if we restart before they [the CL segments] are discarded 
for
         // normal reasons post-truncate.  To prevent this, we store truncation
@@ -2113,7 +2114,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         logger.trace("truncating {}", name);
 
         final long truncatedAt;
-        final ReplayPosition replayAfter;
+        final CommitLogPosition replayAfter;
 
         if (keyspace.getMetadata().params.durableWrites || 
DatabaseDescriptor.isAutoSnapshot())
         {
@@ -2169,7 +2170,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     /**
      * Drops current memtable without flushing to disk. This should only be 
called when truncating a column family which is not durable.
      */
-    public Future<ReplayPosition> dumpMemtable()
+    public Future<CommitLogPosition> dumpMemtable()
     {
         synchronized (data)
         {
@@ -2422,7 +2423,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public Iterable<ColumnFamilyStore> concatWithIndexes()
     {
         // we return the main CFS first, which we rely on for simplicity in 
switchMemtable(), for getting the
-        // latest replay position
+        // latest commit log segment position
         return Iterables.concat(Collections.singleton(this), 
indexManager.getAllIndexColumnFamilyStores());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 7876959..2a55992 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -23,22 +23,15 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOError;
 import java.io.IOException;
-import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Iterables;
 
 import org.apache.commons.lang3.StringUtils;
@@ -49,9 +42,9 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.DirectorySizeCalculator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -814,7 +807,6 @@ public class Directories
         return snapshotSpaceMap;
     }
 
-
     public List<String> listEphemeralSnapshots()
     {
         final List<String> ephemeralSnapshots = new LinkedList<>();
@@ -928,7 +920,7 @@ public class Directories
         if (!input.isDirectory())
             return 0;
 
-        TrueFilesSizeVisitor visitor = new TrueFilesSizeVisitor();
+        SSTableSizeSummer visitor = new 
SSTableSizeSummer(sstableLister(Directories.OnTxnErr.THROW).listFiles());
         try
         {
             Files.walkFileTree(input.toPath(), visitor);
@@ -1012,22 +1004,15 @@ public class Directories
             dataDirectories[i] = new DataDirectory(new File(locations[i]));
     }
     
-    private class TrueFilesSizeVisitor extends SimpleFileVisitor<Path>
+    private class SSTableSizeSummer extends DirectorySizeCalculator
     {
-        private final AtomicLong size = new AtomicLong(0);
-        private final Set<String> visited = newHashSet(); //count each file 
only once
-        private final Set<String> alive;
-
-        TrueFilesSizeVisitor()
+        SSTableSizeSummer(List<File> files)
         {
-            super();
-            Builder<String> builder = ImmutableSet.builder();
-            for (File file : 
sstableLister(Directories.OnTxnErr.THROW).listFiles())
-                builder.add(file.getName());
-            alive = builder.build();
+            super(files);
         }
 
-        private boolean isAcceptable(Path file)
+        @Override
+        public boolean isAcceptable(Path file)
         {
             String fileName = file.toFile().getName();
             Pair<Descriptor, Component> pair = 
SSTable.tryComponentFromFilename(file.getParent().toFile(), fileName);
@@ -1037,27 +1022,5 @@ public class Directories
                     && !visited.contains(fileName)
                     && !alive.contains(fileName);
         }
-
-        @Override
-        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
throws IOException
-        {
-            if (isAcceptable(file))
-            {
-                size.addAndGet(attrs.size());
-                visited.add(file.toFile().getName());
-            }
-            return FileVisitResult.CONTINUE;
-        }
-
-        @Override
-        public FileVisitResult visitFileFailed(Path file, IOException exc) 
throws IOException 
-        {
-            return FileVisitResult.CONTINUE;
-        }
-        
-        public long getAllocatedSize()
-        {
-            return size.get();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 022b996..6e44308 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,12 +26,14 @@ import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -48,12 +49,8 @@ import org.apache.cassandra.metrics.KeyspaceMetrics;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * It represents a Keyspace.
@@ -489,11 +486,11 @@ public class Keyspace
         try (OpOrder.Group opGroup = writeOrder.start())
         {
             // write the mutation to the commitlog and memtables
-            ReplayPosition replayPosition = null;
+            CommitLogPosition commitLogPosition = null;
             if (writeCommitLog)
             {
                 Tracing.trace("Appending to commitlog");
-                replayPosition = CommitLog.instance.add(mutation);
+                commitLogPosition = CommitLog.instance.add(mutation);
             }
 
             for (PartitionUpdate upd : mutation.getPartitionUpdates())
@@ -526,7 +523,7 @@ public class Keyspace
                 UpdateTransaction indexTransaction = updateIndexes
                                                      ? 
cfs.indexManager.newUpdateTransaction(upd, opGroup, nowInSec)
                                                      : UpdateTransaction.NO_OP;
-                cfs.apply(upd, indexTransaction, opGroup, replayPosition);
+                cfs.apply(upd, indexTransaction, opGroup, commitLogPosition);
                 if (requiresViewUpdate)
                     baseComplete.set(System.currentTimeMillis());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index 27c5372..7a46d8a 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -68,23 +68,24 @@ public class Memtable implements Comparable<Memtable>
 
     // the write barrier for directing writes to this memtable during a switch
     private volatile OpOrder.Barrier writeBarrier;
-    // the precise upper bound of ReplayPosition owned by this memtable
-    private volatile AtomicReference<ReplayPosition> commitLogUpperBound;
-    // the precise lower bound of ReplayPosition owned by this memtable; equal 
to its predecessor's commitLogUpperBound
-    private AtomicReference<ReplayPosition> commitLogLowerBound;
-    // the approximate lower bound by this memtable; must be <= 
commitLogLowerBound once our predecessor
+    // the precise upper bound of CommitLogPosition owned by this memtable
+    private volatile AtomicReference<CommitLogPosition> commitLogUpperBound;
+    // the precise lower bound of CommitLogPosition owned by this memtable; 
equal to its predecessor's commitLogUpperBound
+    private AtomicReference<CommitLogPosition> commitLogLowerBound;
+
+    // The approximate lower bound by this memtable; must be <= 
commitLogLowerBound once our predecessor
     // has been finalised, and this is enforced in the 
ColumnFamilyStore.setCommitLogUpperBound
-    private final ReplayPosition approximateCommitLogLowerBound = 
CommitLog.instance.getContext();
+    private final CommitLogPosition approximateCommitLogLowerBound = 
CommitLog.instance.getCurrentPosition();
 
     public int compareTo(Memtable that)
     {
         return 
this.approximateCommitLogLowerBound.compareTo(that.approximateCommitLogLowerBound);
     }
 
-    public static final class LastReplayPosition extends ReplayPosition
+    public static final class LastCommitLogPosition extends CommitLogPosition
     {
-        public LastReplayPosition(ReplayPosition copy) {
-            super(copy.segment, copy.position);
+        public LastCommitLogPosition(CommitLogPosition copy) {
+            super(copy.segmentId, copy.position);
         }
     }
 
@@ -107,7 +108,7 @@ public class Memtable implements Comparable<Memtable>
     private final StatsCollector statsCollector = new StatsCollector();
 
     // only to be used by init(), to setup the very first memtable for the cfs
-    public Memtable(AtomicReference<ReplayPosition> commitLogLowerBound, 
ColumnFamilyStore cfs)
+    public Memtable(AtomicReference<CommitLogPosition> commitLogLowerBound, 
ColumnFamilyStore cfs)
     {
         this.cfs = cfs;
         this.commitLogLowerBound = commitLogLowerBound;
@@ -143,10 +144,10 @@ public class Memtable implements Comparable<Memtable>
     }
 
     @VisibleForTesting
-    public void setDiscarding(OpOrder.Barrier writeBarrier, 
AtomicReference<ReplayPosition> lastReplayPosition)
+    public void setDiscarding(OpOrder.Barrier writeBarrier, 
AtomicReference<CommitLogPosition> commitLogUpperBound)
     {
         assert this.writeBarrier == null;
-        this.commitLogUpperBound = lastReplayPosition;
+        this.commitLogUpperBound = commitLogUpperBound;
         this.writeBarrier = writeBarrier;
         allocator.setDiscarding();
     }
@@ -157,7 +158,7 @@ public class Memtable implements Comparable<Memtable>
     }
 
     // decide if this memtable should take the write, or if it should go to 
the next memtable
-    public boolean accepts(OpOrder.Group opGroup, ReplayPosition 
replayPosition)
+    public boolean accepts(OpOrder.Group opGroup, CommitLogPosition 
commitLogPosition)
     {
         // if the barrier hasn't been set yet, then this memtable is still 
taking ALL writes
         OpOrder.Barrier barrier = this.writeBarrier;
@@ -167,7 +168,7 @@ public class Memtable implements Comparable<Memtable>
         if (!barrier.isAfter(opGroup))
             return false;
         // if we aren't durable we are directed only by the barrier
-        if (replayPosition == null)
+        if (commitLogPosition == null)
             return true;
         while (true)
         {
@@ -176,17 +177,17 @@ public class Memtable implements Comparable<Memtable>
             // its current value and ours; if it HAS been finalised, we simply 
accept its judgement
             // this permits us to coordinate a safe boundary, as the boundary 
choice is made
             // atomically wrt our max() maintenance, so an operation cannot 
sneak into the past
-            ReplayPosition currentLast = commitLogUpperBound.get();
-            if (currentLast instanceof LastReplayPosition)
-                return currentLast.compareTo(replayPosition) >= 0;
-            if (currentLast != null && currentLast.compareTo(replayPosition) 
>= 0)
+            CommitLogPosition currentLast = commitLogUpperBound.get();
+            if (currentLast instanceof LastCommitLogPosition)
+                return currentLast.compareTo(commitLogPosition) >= 0;
+            if (currentLast != null && 
currentLast.compareTo(commitLogPosition) >= 0)
                 return true;
-            if (commitLogUpperBound.compareAndSet(currentLast, replayPosition))
+            if (commitLogUpperBound.compareAndSet(currentLast, 
commitLogPosition))
                 return true;
         }
     }
 
-    public ReplayPosition getCommitLogLowerBound()
+    public CommitLogPosition getCommitLogLowerBound()
     {
         return commitLogLowerBound.get();
     }
@@ -201,7 +202,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.isEmpty();
     }
 
-    public boolean mayContainDataBefore(ReplayPosition position)
+    public boolean mayContainDataBefore(CommitLogPosition position)
     {
         return approximateCommitLogLowerBound.compareTo(position) < 0;
     }
@@ -219,7 +220,7 @@ public class Memtable implements Comparable<Memtable>
      * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, 
which supplies the appropriate
      * OpOrdering.
      *
-     * replayPosition should only be null if this is a secondary index, in 
which case it is *expected* to be null
+     * commitLogSegmentPosition should only be null if this is a secondary 
index, in which case it is *expected* to be null
      */
     long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group 
opGroup)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index 4bad781..61e5ee9 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -61,6 +61,8 @@ public class Mutation implements IMutation
     // keep track of when mutation has started waiting for a MV partition lock
     public final AtomicLong viewLockAcquireStart = new AtomicLong(0);
 
+    private boolean cdcEnabled = false;
+
     public Mutation(String keyspaceName, DecoratedKey key)
     {
         this(keyspaceName, key, new HashMap<>());
@@ -123,10 +125,21 @@ public class Mutation implements IMutation
         return modifications.get(cfId);
     }
 
+    /**
+     * Adds PartitionUpdate to the local set of modifications.
+     * Assumes no updates for the Table this PartitionUpdate impacts.
+     *
+     * @param update PartitionUpdate to append to Modifications list
+     * @return Mutation this mutation
+     * @throws IllegalArgumentException If PartitionUpdate for duplicate table 
is passed as argument
+     */
     public Mutation add(PartitionUpdate update)
     {
         assert update != null;
         assert update.partitionKey().getPartitioner() == key.getPartitioner();
+
+        cdcEnabled |= update.metadata().params.cdc;
+
         PartitionUpdate prev = modifications.put(update.metadata().cfId, 
update);
         if (prev != null)
             // developer error
@@ -256,6 +269,11 @@ public class Mutation implements IMutation
         return gcgs;
     }
 
+    public boolean trackedByCDC()
+    {
+        return cdcEnabled;
+    }
+
     public String toString()
     {
         return toString(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index f25eb3c..026eba1 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -41,7 +41,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -455,7 +455,7 @@ public final class SystemKeyspace
                         .build();
     }
 
-    private static volatile Map<UUID, Pair<ReplayPosition, Long>> 
truncationRecords;
+    private static volatile Map<UUID, Pair<CommitLogPosition, Long>> 
truncationRecords;
 
     public enum BootstrapState
     {
@@ -619,7 +619,7 @@ public final class SystemKeyspace
         return Pair.create(generation, lastKey);
     }
 
-    public static synchronized void saveTruncationRecord(ColumnFamilyStore 
cfs, long truncatedAt, ReplayPosition position)
+    public static synchronized void saveTruncationRecord(ColumnFamilyStore 
cfs, long truncatedAt, CommitLogPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + ? 
WHERE key = '%s'";
         executeInternal(String.format(req, LOCAL, LOCAL), 
truncationAsMapEntry(cfs, truncatedAt, position));
@@ -638,11 +638,11 @@ public final class SystemKeyspace
         forceBlockingFlush(LOCAL);
     }
 
-    private static Map<UUID, ByteBuffer> 
truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition 
position)
+    private static Map<UUID, ByteBuffer> 
truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition 
position)
     {
         try (DataOutputBuffer out = new DataOutputBuffer())
         {
-            ReplayPosition.serializer.serialize(position, out);
+            CommitLogPosition.serializer.serialize(position, out);
             out.writeLong(truncatedAt);
             return singletonMap(cfs.metadata.cfId, 
ByteBuffer.wrap(out.getData(), 0, out.getLength()));
         }
@@ -652,30 +652,30 @@ public final class SystemKeyspace
         }
     }
 
-    public static ReplayPosition getTruncatedPosition(UUID cfId)
+    public static CommitLogPosition getTruncatedPosition(UUID cfId)
     {
-        Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
+        Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId);
         return record == null ? null : record.left;
     }
 
     public static long getTruncatedAt(UUID cfId)
     {
-        Pair<ReplayPosition, Long> record = getTruncationRecord(cfId);
+        Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId);
         return record == null ? Long.MIN_VALUE : record.right;
     }
 
-    private static synchronized Pair<ReplayPosition, Long> 
getTruncationRecord(UUID cfId)
+    private static synchronized Pair<CommitLogPosition, Long> 
getTruncationRecord(UUID cfId)
     {
         if (truncationRecords == null)
             truncationRecords = readTruncationRecords();
         return truncationRecords.get(cfId);
     }
 
-    private static Map<UUID, Pair<ReplayPosition, Long>> 
readTruncationRecords()
+    private static Map<UUID, Pair<CommitLogPosition, Long>> 
readTruncationRecords()
     {
         UntypedResultSet rows = executeInternal(String.format("SELECT 
truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));
 
-        Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();
+        Map<UUID, Pair<CommitLogPosition, Long>> records = new HashMap<>();
 
         if (!rows.isEmpty() && rows.one().has("truncated_at"))
         {
@@ -687,11 +687,11 @@ public final class SystemKeyspace
         return records;
     }
 
-    private static Pair<ReplayPosition, Long> 
truncationRecordFromBlob(ByteBuffer bytes)
+    private static Pair<CommitLogPosition, Long> 
truncationRecordFromBlob(ByteBuffer bytes)
     {
         try (RebufferingInputStream in = new DataInputBuffer(bytes, true))
         {
-            return Pair.create(ReplayPosition.serializer.deserialize(in), 
in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
+            return Pair.create(CommitLogPosition.serializer.deserialize(in), 
in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java 
b/src/java/org/apache/cassandra/db/WriteType.java
index fdbe97d..11909e7 100644
--- a/src/java/org/apache/cassandra/db/WriteType.java
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -25,5 +25,6 @@ public enum WriteType
     COUNTER,
     BATCH_LOG,
     CAS,
-    VIEW;
+    VIEW,
+    CDC;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
new file mode 100644
index 0000000..b8f0a4e
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All 
the
+ * public methods are thread safe.
+ */
+public abstract class AbstractCommitLogSegmentManager
+{
+    static final Logger logger = 
LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
+
+    // Queue of work to be done by the manager thread, also used to wake the 
thread to perform segment allocation.
+    private final BlockingQueue<Runnable> segmentManagementTasks = new 
LinkedBlockingQueue<>();
+
+    /** Segments that are ready to be used. Head of the queue is the one we 
allocate writes to */
+    private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = 
new ConcurrentLinkedQueue<>();
+
+    /** Active segments, containing unflushed data */
+    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new 
ConcurrentLinkedQueue<>();
+
+    /** The segment we are currently allocating commit log records to */
+    protected volatile CommitLogSegment allocatingFrom = null;
+
+    private final WaitQueue hasAvailableSegments = new WaitQueue();
+
+    final String storageDirectory;
+
+    /**
+     * Tracks commitlog size, in multiples of the segment size.  We need to do 
this so we can "promise" size
+     * adjustments ahead of actually adding/freeing segments on disk, so that 
the "evict oldest segment" logic
+     * can see the effect of recycling segments immediately (even though 
they're really happening asynchronously
+     * on the manager thread, which will take a ms or two).
+     */
+    private final AtomicLong size = new AtomicLong();
+
+    /**
+     * New segment creation is initially disabled because we'll typically get 
some "free" segments
+     * recycled after log replay.
+     */
+    volatile boolean createReserveSegments = false;
+
+    // Used by tests to determine if segment manager is active or not.
+    volatile boolean processingTask = false;
+
+    private Thread managerThread;
+    protected volatile boolean run = true;
+    protected final CommitLog commitLog;
+
+    private static final SimpleCachedBufferPool bufferPool =
+        new 
SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersPerPool(),
 DatabaseDescriptor.getCommitLogSegmentSize());
+
+    AbstractCommitLogSegmentManager(final CommitLog commitLog, String 
storageDirectory)
+    {
+        this.commitLog = commitLog;
+        this.storageDirectory = storageDirectory;
+    }
+
+    void start()
+    {
+        // The run loop for the manager thread
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws Exception
+            {
+                while (run)
+                {
+                    try
+                    {
+                        Runnable task = segmentManagementTasks.poll();
+                        processingTask = true;
+                        if (task == null)
+                        {
+                            // if we have no more work to do, check if we 
should create a new segment
+                            if (!atSegmentLimit() &&
+                                availableSegments.isEmpty() &&
+                                (activeSegments.isEmpty() || 
createReserveSegments))
+                            {
+                                logger.trace("No segments in reserve; creating 
a fresh one");
+                                // TODO : some error handling in case we fail 
to create a new segment
+                                availableSegments.add(createSegment());
+                                hasAvailableSegments.signalAll();
+                            }
+
+                            // flush old Cfs if we're full
+                            long unused = unusedCapacity();
+                            if (unused < 0)
+                            {
+                                List<CommitLogSegment> segmentsToRecycle = new 
ArrayList<>();
+                                long spaceToReclaim = 0;
+                                for (CommitLogSegment segment : activeSegments)
+                                {
+                                    if (segment == allocatingFrom)
+                                        break;
+                                    segmentsToRecycle.add(segment);
+                                    spaceToReclaim += 
DatabaseDescriptor.getCommitLogSegmentSize();
+                                    if (spaceToReclaim + unused >= 0)
+                                        break;
+                                }
+                                flushDataFrom(segmentsToRecycle, false);
+                            }
+
+                            // Since we're operating on a "null" allocation 
task, block here for the next task on the
+                            // queue rather than looping, grabbing another 
null, and repeating the above work.
+                            try
+                            {
+                                processingTask = false;
+                                task = segmentManagementTasks.take();
+                            }
+                            catch (InterruptedException e)
+                            {
+                                throw new AssertionError();
+                            }
+                        }
+                        task.run();
+                        processingTask = false;
+                    }
+                    catch (Throwable t)
+                    {
+                        JVMStabilityInspector.inspectThrowable(t);
+                        if (!CommitLog.handleCommitError("Failed managing 
commit log segments", t))
+                            return;
+                        // sleep some arbitrary period to avoid spamming CL
+                        Uninterruptibles.sleepUninterruptibly(1, 
TimeUnit.SECONDS);
+                    }
+                }
+            }
+
+            private boolean atSegmentLimit()
+            {
+                return CommitLogSegment.usesBufferPool(commitLog) && 
bufferPool.atLimit();
+            }
+        };
+
+        run = true;
+
+        managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+        managerThread.start();
+    }
+
+
+    /**
+     * Shut down the CLSM. Used both during testing and during regular 
shutdown, so needs to stop everything.
+     */
+    public abstract void shutdown();
+
+    /**
+     * Allocate a segment within this CLSM. Should either succeed or throw.
+     */
+    public abstract Allocation allocate(Mutation mutation, int size);
+
+    /**
+     * The recovery and replay process replays mutations into memtables and 
flushes them to disk. Individual CLSM
+     * decide what to do with those segments on disk after they've been 
replayed.
+     */
+    abstract void handleReplayedSegment(final File file);
+
+    /**
+     * Hook to allow segment managers to track state surrounding creation of 
new segments. Onl perform as task submit
+     * to segment manager so it's performed on segment management thread.
+     */
+    abstract CommitLogSegment createSegment();
+
+    /**
+     * Indicates that a segment file has been flushed and is no longer needed. 
Only perform as task submit to segment
+     * manager so it's performend on segment management thread, or perform 
while segment management thread is shutdown
+     * during testing resets.
+     *
+     * @param segment segment to be discarded
+     * @param delete  whether or not the segment is safe to be deleted.
+     */
+    abstract void discard(CommitLogSegment segment, boolean delete);
+
+
+    /**
+     * Grab the current CommitLogSegment we're allocating from. Also serves as 
a utility method to block while the allocator
+     * is working on initial allocation of a CommitLogSegment.
+     */
+    CommitLogSegment allocatingFrom()
+    {
+        CommitLogSegment r = allocatingFrom;
+        if (r == null)
+        {
+            advanceAllocatingFrom(null);
+            r = allocatingFrom;
+        }
+        return r;
+    }
+
+    /**
+     * Fetches a new segment from the queue, signaling the management thread 
to create a new one if necessary, and "activates" it.
+     * Blocks until a new segment is allocated and the thread requesting an 
advanceAllocatingFrom is signalled.
+     *
+     * WARNING: Assumes segment management thread always succeeds in 
allocating a new segment or kills the JVM.
+     */
+    protected void advanceAllocatingFrom(CommitLogSegment old)
+    {
+        while (true)
+        {
+            CommitLogSegment next;
+            synchronized (this)
+            {
+                // do this in a critical section so we can atomically remove 
from availableSegments and add to allocatingFrom/activeSegments
+                // see 
https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432
+                if (allocatingFrom != old)
+                    return;
+                next = availableSegments.poll();
+                if (next != null)
+                {
+                    allocatingFrom = next;
+                    activeSegments.add(next);
+                }
+            }
+
+            if (next != null)
+            {
+                if (old != null)
+                {
+                    // Now we can run the user defined command just after 
switching to the new commit log.
+                    // (Do this here instead of in the recycle call so we can 
get a head start on the archive.)
+                    commitLog.archiver.maybeArchive(old);
+
+                    // ensure we don't continue to use the old file; not 
strictly necessary, but cleaner to enforce it
+                    old.discardUnusedTail();
+                }
+
+                // request that the CL be synced out-of-band, as we've 
finished a segment
+                commitLog.requestExtraSync();
+                return;
+            }
+
+            // no more segments, so register to receive a signal when not empty
+            WaitQueue.Signal signal = 
hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+
+            // trigger the management thread; this must occur after registering
+            // the signal to ensure we are woken by any new segment creation
+            wakeManager();
+
+            // check if the queue has already been added to before waiting on 
the signal, to catch modifications
+            // that happened prior to registering the signal; *then* check to 
see if we've been beaten to making the change
+            if (!availableSegments.isEmpty() || allocatingFrom != old)
+            {
+                signal.cancel();
+                // if we've been beaten, just stop immediately
+                if (allocatingFrom != old)
+                    return;
+                // otherwise try again, as there should be an available segment
+                continue;
+            }
+
+            // can only reach here if the queue hasn't been inserted into
+            // before we registered the signal, as we only remove items from 
the queue
+            // after updating allocatingFrom. Can safely block until we are 
signalled
+            // by the allocator that new segments have been published
+            signal.awaitUninterruptibly();
+        }
+    }
+
+    protected void wakeManager()
+    {
+        // put a NO-OP on the queue, to trigger management thread (and create 
a new segment if necessary)
+        segmentManagementTasks.add(Runnables.doNothing());
+    }
+
+    /**
+     * Switch to a new segment, regardless of how much is left in the current 
one.
+     *
+     * Flushes any dirty CFs for this segment and any older segments, and then 
recycles
+     * the segments
+     */
+    void forceRecycleAll(Iterable<UUID> droppedCfs)
+    {
+        List<CommitLogSegment> segmentsToRecycle = new 
ArrayList<>(activeSegments);
+        CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() 
- 1);
+        advanceAllocatingFrom(last);
+
+        // wait for the commit log modifications
+        last.waitForModifications();
+
+        // make sure the writes have materialized inside of the memtables by 
waiting for all outstanding writes
+        // on the relevant keyspaces to complete
+        Keyspace.writeOrder.awaitNewBarrier();
+
+        // flush and wait for all CFs that are dirty in segments up-to and 
including 'last'
+        Future<?> future = flushDataFrom(segmentsToRecycle, true);
+        try
+        {
+            future.get();
+
+            for (CommitLogSegment segment : activeSegments)
+                for (UUID cfId : droppedCfs)
+                    segment.markClean(cfId, 
segment.getCurrentCommitLogPosition());
+
+            // now recycle segments that are unused, as we may not have 
triggered a discardCompletedSegments()
+            // if the previous active segment was the only one to recycle 
(since an active segment isn't
+            // necessarily dirty, and we only call dCS after a flush).
+            for (CommitLogSegment segment : activeSegments)
+            {
+                if (segment.isUnused())
+                    recycleSegment(segment);
+            }
+
+            CommitLogSegment first;
+            if ((first = activeSegments.peek()) != null && first.id <= last.id)
+                logger.error("Failed to force-recycle all segments; at least 
one segment is still in use with dirty CFs.");
+        }
+        catch (Throwable t)
+        {
+            // for now just log the error
+            logger.error("Failed waiting for a forced recycle of in-use commit 
log segments", t);
+        }
+    }
+
+    /**
+     * Indicates that a segment is no longer in use and that it should be 
recycled.
+     *
+     * @param segment segment that is no longer in use
+     */
+    void recycleSegment(final CommitLogSegment segment)
+    {
+        boolean archiveSuccess = 
commitLog.archiver.maybeWaitForArchiving(segment.getName());
+        if (activeSegments.remove(segment))
+        {
+            // if archiving (command) was not successful then leave the file 
alone. don't delete or recycle.
+            discardSegment(segment, archiveSuccess);
+        }
+        else
+        {
+            logger.warn("segment {} not found in activeSegments queue", 
segment);
+        }
+    }
+
+    /**
+     * Indicates that a segment file should be deleted.
+     *
+     * @param segment segment to be discarded
+     */
+    private void discardSegment(final CommitLogSegment segment, final boolean 
deleteFile)
+    {
+        logger.trace("Segment {} is no longer active and will be deleted {}", 
segment, deleteFile ? "now" : "by the archive script");
+        segmentManagementTasks.add(() -> discard(segment, deleteFile));
+    }
+
+    /**
+     * Adjust the tracked on-disk size. Called by individual segments to 
reflect writes, allocations and discards.
+     * @param addedSize
+     */
+    void addSize(long addedSize)
+    {
+        size.addAndGet(addedSize);
+    }
+
+    /**
+     * @return the space (in bytes) used by all segment files.
+     */
+    public long onDiskSize()
+    {
+        return size.get();
+    }
+
+    private long unusedCapacity()
+    {
+        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 
1024;
+        long currentSize = size.get();
+        logger.trace("Total active commitlog segment space used is {} out of 
{}", currentSize, total);
+        return total - currentSize;
+    }
+
+    /**
+     * @param name the filename to check
+     * @return true if file is managed by this manager.
+     */
+    public boolean manages(String name)
+    {
+        for (CommitLogSegment segment : Iterables.concat(activeSegments, 
availableSegments))
+            if (segment.getName().equals(name))
+                return true;
+        return false;
+    }
+
+    /**
+     * Throws a flag that enables the behavior of keeping at least one spare 
segment
+     * available at all times.
+     */
+    void enableReserveSegmentCreation()
+    {
+        createReserveSegments = true;
+        wakeManager();
+    }
+
+    /**
+     * Force a flush on all CFs that are still dirty in @param segments.
+     *
+     * @return a Future that will finish when all the flushes are complete.
+     */
+    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean 
force)
+    {
+        if (segments.isEmpty())
+            return Futures.immediateFuture(null);
+        final CommitLogPosition maxCommitLogPosition = 
segments.get(segments.size() - 1).getCurrentCommitLogPosition();
+
+        // a map of CfId -> forceFlush() to ensure we only queue one flush per 
cf
+        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+
+        for (CommitLogSegment segment : segments)
+        {
+            for (UUID dirtyCFId : segment.getDirtyCFIDs())
+            {
+                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+                if (pair == null)
+                {
+                    // even though we remove the schema entry before a final 
flush when dropping a CF,
+                    // it's still possible for a writer to race and finish his 
append after the flush.
+                    logger.trace("Marking clean CF {} that doesn't exist 
anymore", dirtyCFId);
+                    segment.markClean(dirtyCFId, 
segment.getCurrentCommitLogPosition());
+                }
+                else if (!flushes.containsKey(dirtyCFId))
+                {
+                    String keyspace = pair.left;
+                    final ColumnFamilyStore cfs = 
Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+                    // can safely call forceFlush here as we will only ever 
block (briefly) for other attempts to flush,
+                    // no deadlock possibility since switchLock removal
+                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : 
cfs.forceFlush(maxCommitLogPosition));
+                }
+            }
+        }
+
+        return Futures.allAsList(flushes.values());
+    }
+
+    /**
+     * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+     * Only call this after the AbstractCommitLogService is shut down.
+     */
+    public void stopUnsafe(boolean deleteSegments)
+    {
+        logger.trace("CLSM closing and clearing existing commit log 
segments...");
+        createReserveSegments = false;
+
+        awaitManagementTasksCompletion();
+
+        shutdown();
+        try
+        {
+            awaitTermination();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        synchronized (this)
+        {
+            for (CommitLogSegment segment : activeSegments)
+                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+            activeSegments.clear();
+
+            for (CommitLogSegment segment : availableSegments)
+                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+            availableSegments.clear();
+        }
+
+        allocatingFrom = null;
+
+        segmentManagementTasks.clear();
+
+        size.set(0L);
+
+        logger.trace("CLSM done with closing and clearing existing commit log 
segments.");
+    }
+
+    // Used by tests only.
+    void awaitManagementTasksCompletion()
+    {
+        while (segmentManagementTasks.size() > 0 || processingTask)
+            Thread.yield();
+    }
+
+    /**
+     * Explicitly for use only during resets in unit testing.
+     */
+    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean 
delete)
+    {
+        try
+        {
+            discard(segment, delete);
+        }
+        catch (AssertionError ignored)
+        {
+            // segment file does not exist
+        }
+    }
+
+    /**
+     * Returns when the management thread terminates.
+     */
+    public void awaitTermination() throws InterruptedException
+    {
+        managerThread.join();
+
+        for (CommitLogSegment segment : activeSegments)
+            segment.close();
+
+        for (CommitLogSegment segment : availableSegments)
+            segment.close();
+
+        bufferPool.shutdown();
+    }
+
+    /**
+     * @return a read-only collection of the active commit log segments
+     */
+    @VisibleForTesting
+    public Collection<CommitLogSegment> getActiveSegments()
+    {
+        return Collections.unmodifiableCollection(activeSegments);
+    }
+
+    /**
+     * @return the current CommitLogPosition of the active segment we're 
allocating from
+     */
+    CommitLogPosition getCurrentPosition()
+    {
+        return allocatingFrom().getCurrentCommitLogPosition();
+    }
+
+    /**
+     * Forces a disk flush on the commit log files that need it.  Blocking.
+     */
+    public void sync(boolean syncAllSegments) throws IOException
+    {
+        CommitLogSegment current = allocatingFrom();
+        for (CommitLogSegment segment : getActiveSegments())
+        {
+            if (!syncAllSegments && segment.id > current.id)
+                return;
+            segment.sync();
+        }
+    }
+
+    /**
+     * Used by compressed and encrypted segments to share a buffer pool across 
the CLSM.
+     */
+    SimpleCachedBufferPool getBufferPool()
+    {
+        return bufferPool;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 113d1ba..0ba4f55 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -29,7 +29,6 @@ import static 
org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 
 public abstract class AbstractCommitLogService
 {
-
     private Thread thread;
     private volatile boolean shutdown = false;
 
@@ -89,7 +88,7 @@ public abstract class AbstractCommitLogService
 
                         // sync and signal
                         long syncStarted = System.currentTimeMillis();
-                        //This is a target for Byteman in 
CommitLogSegmentManagerTest
+                        // This is a target for Byteman in 
CommitLogSegmentManagerTest
                         commitLog.sync(shutdown);
                         lastSyncedAt = syncStarted;
                         syncComplete.signalAll();

Reply via email to