Add Change Data Capture Patch by jmckenzie; reviewed by cyeksigian and blambov for CASSANDRA-8844
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e31e2162 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e31e2162 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e31e2162 Branch: refs/heads/trunk Commit: e31e216234c6b57a531cae607e0355666007deb2 Parents: ed538f9 Author: Josh McKenzie <jmcken...@apache.org> Authored: Sun Mar 27 09:20:47 2016 -0400 Committer: Josh McKenzie <jmcken...@apache.org> Committed: Thu Jun 16 10:01:39 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 32 +- build.xml | 51 +- conf/cassandra.yaml | 26 + pylib/cqlshlib/cql3handling.py | 5 +- src/antlr/Parser.g | 3 +- .../org/apache/cassandra/config/Config.java | 6 + .../cassandra/config/DatabaseDescriptor.java | 86 ++- .../statements/CreateKeyspaceStatement.java | 1 + .../cql3/statements/DropKeyspaceStatement.java | 2 +- .../cql3/statements/TableAttributes.java | 3 + .../apache/cassandra/db/ColumnFamilyStore.java | 67 +-- .../org/apache/cassandra/db/Directories.java | 51 +- src/java/org/apache/cassandra/db/Keyspace.java | 17 +- src/java/org/apache/cassandra/db/Memtable.java | 47 +- src/java/org/apache/cassandra/db/Mutation.java | 18 + .../org/apache/cassandra/db/SystemKeyspace.java | 26 +- src/java/org/apache/cassandra/db/WriteType.java | 3 +- .../AbstractCommitLogSegmentManager.java | 584 +++++++++++++++++++ .../db/commitlog/AbstractCommitLogService.java | 3 +- .../cassandra/db/commitlog/CommitLog.java | 157 +++-- .../db/commitlog/CommitLogPosition.java | 121 ++++ .../db/commitlog/CommitLogReadHandler.java | 76 +++ .../cassandra/db/commitlog/CommitLogReader.java | 501 ++++++++++++++++ .../db/commitlog/CommitLogReplayer.java | 582 +++++++----------- .../db/commitlog/CommitLogSegment.java | 110 ++-- .../db/commitlog/CommitLogSegmentManager.java | 567 ------------------ .../commitlog/CommitLogSegmentManagerCDC.java | 302 ++++++++++ .../CommitLogSegmentManagerStandard.java | 89 +++ .../db/commitlog/CommitLogSegmentReader.java | 366 ++++++++++++ .../db/commitlog/CompressedSegment.java | 12 +- .../db/commitlog/EncryptedSegment.java | 18 +- .../db/commitlog/FileDirectSegment.java | 73 +-- .../db/commitlog/MemoryMappedSegment.java | 6 +- .../cassandra/db/commitlog/ReplayPosition.java | 178 ------ .../cassandra/db/commitlog/SegmentReader.java | 355 ----------- .../db/commitlog/SimpleCachedBufferPool.java | 118 ++++ .../apache/cassandra/db/lifecycle/Tracker.java | 8 +- .../apache/cassandra/db/view/TableViews.java | 4 +- .../apache/cassandra/db/view/ViewManager.java | 2 - .../io/sstable/format/SSTableReader.java | 1 - .../metadata/LegacyMetadataSerializer.java | 12 +- .../io/sstable/metadata/MetadataCollector.java | 16 +- .../io/sstable/metadata/StatsMetadata.java | 24 +- .../cassandra/metrics/CommitLogMetrics.java | 9 +- .../apache/cassandra/schema/SchemaKeyspace.java | 6 +- .../apache/cassandra/schema/TableParams.java | 23 +- .../cassandra/service/CassandraDaemon.java | 4 +- .../cassandra/streaming/StreamReceiveTask.java | 36 +- .../utils/DirectorySizeCalculator.java | 98 ++++ .../cassandra/utils/JVMStabilityInspector.java | 3 +- .../cassandra/utils/memory/BufferPool.java | 2 +- test/conf/cassandra-murmur.yaml | 2 + test/conf/cassandra.yaml | 2 + test/conf/cdc.yaml | 1 + test/data/bloom-filter/ka/foo.cql | 2 +- .../db/commitlog/CommitLogStressTest.java | 123 ++-- .../test/microbench/DirectorySizerBench.java | 105 ++++ .../OffsetAwareConfigurationLoader.java | 13 +- .../cassandra/batchlog/BatchlogManagerTest.java | 4 +- .../apache/cassandra/cql3/CDCStatementTest.java | 50 ++ .../org/apache/cassandra/cql3/CQLTester.java | 4 + .../apache/cassandra/cql3/OutOfSpaceTest.java | 2 +- .../cql3/validation/operations/CreateTest.java | 5 +- .../apache/cassandra/db/ReadMessageTest.java | 10 +- .../db/commitlog/CommitLogReaderTest.java | 267 +++++++++ .../CommitLogSegmentManagerCDCTest.java | 220 +++++++ .../commitlog/CommitLogSegmentManagerTest.java | 23 +- .../cassandra/db/commitlog/CommitLogTest.java | 130 +++-- .../db/commitlog/CommitLogTestReplayer.java | 59 +- .../db/commitlog/CommitLogUpgradeTest.java | 18 +- .../db/commitlog/CommitLogUpgradeTestMaker.java | 4 +- .../db/commitlog/SegmentReaderTest.java | 6 +- .../cassandra/db/lifecycle/TrackerTest.java | 18 +- .../cassandra/hints/HintsEncryptionTest.java | 2 +- .../metadata/MetadataSerializerTest.java | 6 +- .../apache/cassandra/utils/KillerForTests.java | 16 + 77 files changed, 3907 insertions(+), 2096 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c9fa673..9c44a63 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * Faster streaming (CASSANDRA-9766) * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425) * Add repaired percentage metric (CASSANDRA-11503) + * Add Change-Data-Capture (CASSANDRA-8844) Merged from 3.0: * Add TimeWindowCompactionStrategy (CASSANDRA-9666) Merged from 2.2: http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index f9430ac..aa2612d 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -30,13 +30,13 @@ New features - TimeWindowCompactionStrategy has been added. This has proven to be a better approach to time series compaction and new tables should use this instead of DTCS. See CASSANDRA-9666 for details. - -Deprecation ------------ - - DateTieredCompactionStrategy has been deprecated - new tables should use - TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might - cause increased compaction load for a while after the migration so make sure you run - tests before migrating. Read CASSANDRA-9666 for background on this. + - Change-Data-Capture is now available. See cassandra.yaml and for cdc-specific flags and + a brief explanation of on-disk locations for archived data in CommitLog form. This can + be enabled via ALTER TABLE ... WITH cdc=true. + Upon flush, CommitLogSegments containing data for CDC-enabled tables are moved to + the data/cdc_raw directory until removed by the user and writes to CDC-enabled tables + will be rejected with a WriteTimeoutException once cdc_total_space_in_mb is reached + between unflushed CommitLogSegments and cdc_raw. Upgrading --------- @@ -47,6 +47,12 @@ Upgrading drop the old versions, and this _before_ upgrade (see CASSANDRA-10783 for more details). +Deprecation +----------- + - DateTieredCompactionStrategy has been deprecated - new tables should use + TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might + cause increased compaction load for a while after the migration so make sure you run + tests before migrating. Read CASSANDRA-9666 for background on this. 3.7 === @@ -59,6 +65,7 @@ Upgrading value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details. + 3.6 ===== @@ -89,6 +96,7 @@ New features - Filtering expressions are made more pluggable and can be added programatically via a QueryHandler implementation. See CASSANDRA-11295 for more details. + 3.4 ===== @@ -366,8 +374,8 @@ Upgrading the legacy tables, so clients experience no disruption. Issuing DCL statements during an upgrade is not supported. Once all nodes are upgraded, an operator with superuser privileges should - drop the legacy tables, system_auth.users, system_auth.credentials and - system_auth.permissions. Doing so will prompt Cassandra to switch over to + drop the legacy tables, system_auth.users, system_auth.credentials and + system_auth.permissions. Doing so will prompt Cassandra to switch over to the new tables without requiring any further intervention. While the legacy tables are present a restarted node will re-run the data conversion and report the outcome so that operators can verify that it is @@ -536,8 +544,8 @@ Upgrading - cqlsh will now display timestamps with a UTC timezone. Previously, timestamps were displayed with the local timezone. - Commit log files are no longer recycled by default, due to negative - performance implications. This can be enabled again with the - commitlog_segment_recycling option in your cassandra.yaml + performance implications. This can be enabled again with the + commitlog_segment_recycling option in your cassandra.yaml - JMX methods set/getCompactionStrategyClass have been deprecated, use set/getCompactionParameters/set/getCompactionParametersJson instead @@ -666,7 +674,7 @@ New features Upgrading --------- - commitlog_sync_batch_window_in_ms behavior has changed from the - maximum time to wait between fsync to the minimum time. We are + maximum time to wait between fsync to the minimum time. We are working on making this more user-friendly (see CASSANDRA-9533) but in the meantime, this means 2.1 needs a much smaller batch window to keep writer threads from starving. The suggested default is now 2ms. http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index aa55809..b01e499 100644 --- a/build.xml +++ b/build.xml @@ -1240,9 +1240,20 @@ <filelist dir="@{inputdir}" files="@{filelist}"/> </batchtest> </junit> - <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/commitlog:@{poffset}"/> - <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/data:@{poffset}"/> - <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/saved_caches:@{poffset}"/> + + <condition property="fileSep" value=";"> + <os family="windows"/> + </condition> + <condition property="fileSep" else=":"> + <isset property="fileSep"/> + </condition> + <fail unless="fileSep">Failed to set File Separator. This shouldn't happen.</fail> + + <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/commitlog${fileSep}@{poffset}"/> + <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/cdc_raw${fileSep}@{poffset}"/> + <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/data${fileSep}@{poffset}"/> + <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/saved_caches${fileSep}@{poffset}"/> + <delete quiet="true" failonerror="false" dir="${build.test.dir}/cassandra/hints${fileSep}@{poffset}"/> </sequential> </macrodef> @@ -1337,6 +1348,25 @@ </sequential> </macrodef> + <macrodef name="testlist-cdc"> + <attribute name="test.file.list" /> + <attribute name="testlist.offset" /> + <sequential> + <property name="cdc_yaml" value="${build.test.dir}/cassandra.cdc.yaml"/> + <testmacrohelper inputdir="${test.unit.src}" filelist="@{test.file.list}" poffset="@{testlist.offset}" + exclude="**/*.java" timeout="${test.timeout}" testtag="cdc"> + <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/> + <jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/> + <jvmarg value="-Dmigration-sstable-root=${test.data}/migration-sstables"/> + <jvmarg value="-Dcassandra.ring_delay_ms=1000"/> + <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/> + <jvmarg value="-Dcassandra.config=file:///${cdc_yaml}"/> + <jvmarg value="-Dcassandra.skip_sync=true" /> + <jvmarg value="-Dcassandra.config.loader=org.apache.cassandra.OffsetAwareConfigurationLoader"/> + </testmacrohelper> + </sequential> + </macrodef> + <!-- Run named ant task with jacoco, such as "ant jacoco-run -Dtaskname=test" the target run must enable the jacoco agent if usejacoco is 'yes' --> @@ -1377,13 +1407,26 @@ <testparallel testdelegate="testlist-compression" /> </target> + <target name="test-cdc" depends="build-test" description="Execute unit tests with change-data-capture enabled"> + <property name="cdc_yaml" value="${build.test.dir}/cassandra.cdc.yaml"/> + <concat destfile="${cdc_yaml}"> + <fileset file="${test.conf}/cassandra.yaml"/> + <fileset file="${test.conf}/cdc.yaml"/> + </concat> + <path id="all-test-classes-path"> + <fileset dir="${test.unit.src}" includes="**/${test.name}.java" /> + </path> + <property name="all-test-classes" refid="all-test-classes-path"/> + <testparallel testdelegate="testlist-cdc" /> + </target> + <target name="msg-ser-gen-test" depends="build-test" description="Generates message serializations"> <testmacro inputdir="${test.unit.src}" timeout="${test.timeout}" filter="**/SerializationsTest.java"> <jvmarg value="-Dcassandra.test-serialization-writes=True"/> </testmacro> </target> - + <target name="msg-ser-test" depends="build-test" description="Tests message serializations"> <testmacro inputdir="${test.unit.src}" timeout="${test.timeout}" filter="**/SerializationsTest.java"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index dcd5278..1068508 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -193,6 +193,17 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner # If not set, the default directory is $CASSANDRA_HOME/data/commitlog. # commitlog_directory: /var/lib/cassandra/commitlog +# Enable / disable CDC functionality on a per-node basis. This modifies the logic used +# for write path allocation rejection (standard: never reject. cdc: reject Mutation +# containing a CDC-enabled table if at space limit in cdc_raw_directory). +cdc_enabled: false + +# CommitLogSegments are moved to this directory on flush if cdc_enabled: true and the +# segment contains mutations for a CDC-enabled table. This should be placed on a +# separate spindle than the data directories. If not set, the default directory is +# $CASSANDRA_HOME/data/cdc_raw. +# cdc_raw_directory: /var/lib/cassandra/cdc_raw + # policy for data disk failures: # die: shut down gossip and client transports and kill the JVM for any fs errors or # single-sstable errors, so the node can be replaced. @@ -474,6 +485,21 @@ memtable_allocation_type: heap_buffers # avoid having memtable_flush_writers * data_file_directories > number of cores #memtable_flush_writers: 1 +# Total space to use for change-data-capture logs on disk. +# +# If space gets above this value, Cassandra will throw WriteTimeoutException +# on Mutations including tables with CDC enabled. A CDCCompactor is responsible +# for parsing the raw CDC logs and deleting them when parsing is completed. +# +# The default value is the min of 4096 mb and 1/8th of the total space +# of the drive where cdc_raw_directory resides. +# cdc_total_space_in_mb: 4096 + +# When we hit our cdc_raw limit and the CDCCompactor is either running behind +# or experiencing backpressure, we check at the following interval to see if any +# new space for cdc-tracked tables has been made available. Default to 250ms +# cdc_free_space_check_interval_ms: 250 + # A fixed memory pool size in MB for for SSTable index summaries. If left # empty, this will default to 5% of the heap size. If the memory usage of # all index summaries exceeds this limit, SSTables with low read rates will http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 31a8459..70e12d4 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -51,6 +51,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet): ('default_time_to_live', None), ('speculative_retry', None), ('memtable_flush_period_in_ms', None), + ('cdc', None) ) columnfamily_layout_map_options = ( @@ -478,6 +479,8 @@ def cf_prop_val_completer(ctxt, cass): if this_opt in ('min_compaction_threshold', 'max_compaction_threshold', 'gc_grace_seconds', 'min_index_interval', 'max_index_interval'): return [Hint('<integer>')] + if this_opt in ('cdc'): + return [Hint('<true|false>')] return [Hint('<option_value>')] @@ -1125,7 +1128,7 @@ syntax_rules += r''' ; <cfamProperty> ::= <property> - | "COMPACT" "STORAGE" + | "COMPACT" "STORAGE" "CDC" | "CLUSTERING" "ORDER" "BY" "(" <cfamOrdering> ( "," <cfamOrdering> )* ")" ; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/antlr/Parser.g ---------------------------------------------------------------------- diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g index cdb2263..f61f464 100644 --- a/src/antlr/Parser.g +++ b/src/antlr/Parser.g @@ -100,7 +100,7 @@ options { if (map == null || map.entries == null || map.entries.isEmpty()) return Collections.<String, String>emptyMap(); - Map<String, String> res = new HashMap<String, String>(map.entries.size()); + Map<String, String> res = new HashMap<>(map.entries.size()); for (Pair<Term.Raw, Term.Raw> entry : map.entries) { @@ -762,7 +762,6 @@ alterKeyspaceStatement returns [AlterKeyspaceStatement expr] K_WITH properties[attrs] { $expr = new AlterKeyspaceStatement(ks, attrs); } ; - /** * ALTER COLUMN FAMILY <CF> ALTER <column> TYPE <newtype>; * ALTER COLUMN FAMILY <CF> ADD <column> <newtype>; | ALTER COLUMN FAMILY <CF> ADD (<column> <newtype>,<column1> <newtype1>..... <column n> <newtype n>) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index c6fa4fe..c18d1e0 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -198,6 +198,12 @@ public class Config public Integer max_mutation_size_in_kb; + // Change-data-capture logs + public Boolean cdc_enabled = false; + public String cdc_raw_directory; + public Integer cdc_total_space_in_mb; + public Integer cdc_free_space_check_interval_ms = 250; + @Deprecated public int commitlog_periodic_queue_size = -1; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index e348ec6..5b3e57d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -531,6 +531,14 @@ public class DatabaseDescriptor conf.hints_directory += File.separator + "hints"; } + if (conf.cdc_raw_directory == null) + { + conf.cdc_raw_directory = System.getProperty("cassandra.storagedir", null); + if (conf.cdc_raw_directory == null) + throw new ConfigurationException("cdc_raw_directory is missing and -Dcassandra.storagedir is not set", false); + conf.cdc_raw_directory += File.separator + "cdc_raw"; + } + if (conf.commitlog_total_space_in_mb == null) { int preferredSize = 8192; @@ -558,6 +566,38 @@ public class DatabaseDescriptor } } + if (conf.cdc_total_space_in_mb == null) + { + int preferredSize = 4096; + int minSize = 0; + try + { + // use 1/8th of available space. See discussion on #10013 and #10199 on the CL, taking half that for CDC + minSize = Ints.checkedCast((guessFileStore(conf.cdc_raw_directory).getTotalSpace() / 1048576) / 8); + } + catch (IOException e) + { + logger.debug("Error checking disk space", e); + throw new ConfigurationException(String.format("Unable to check disk space available to %s. Perhaps the Cassandra user does not have the necessary permissions", + conf.cdc_raw_directory), e); + } + if (minSize < preferredSize) + { + logger.warn("Small cdc volume detected at {}; setting cdc_total_space_in_mb to {}. You can override this in cassandra.yaml", + conf.cdc_raw_directory, minSize); + conf.cdc_total_space_in_mb = minSize; + } + else + { + conf.cdc_total_space_in_mb = preferredSize; + } + } + + if (conf.cdc_enabled != null) + { + logger.info("cdc_enabled is true. Starting casssandra node with Change-Data-Capture enabled."); + } + if (conf.saved_caches_directory == null) { conf.saved_caches_directory = System.getProperty("cassandra.storagedir", null); @@ -946,6 +986,13 @@ public class DatabaseDescriptor if (conf.saved_caches_directory == null) throw new ConfigurationException("saved_caches_directory must be specified", false); FileUtils.createDirectory(conf.saved_caches_directory); + + if (conf.cdc_enabled) + { + if (conf.cdc_raw_directory == null) + throw new ConfigurationException("cdc_raw_directory must be specified", false); + FileUtils.createDirectory(conf.cdc_raw_directory); + } } catch (ConfigurationException e) { @@ -1349,6 +1396,12 @@ public class DatabaseDescriptor return conf.commitlog_directory; } + @VisibleForTesting + public static void setCommitLogLocation(String value) + { + conf.commitlog_directory = value; + } + public static ParameterizedClass getCommitLogCompression() { return conf.commitlog_compression; @@ -1359,7 +1412,12 @@ public class DatabaseDescriptor conf.commitlog_compression = compressor; } - public static int getCommitLogMaxCompressionBuffersInPool() + /** + * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that + * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use + * more, depending on how soon the sync policy stops all writing threads. + */ + public static int getCommitLogMaxCompressionBuffersPerPool() { return conf.commitlog_max_compression_buffers_in_pool; } @@ -2121,6 +2179,32 @@ public class DatabaseDescriptor return conf.gc_warn_threshold_in_ms; } + public static boolean isCDCEnabled() + { + return conf.cdc_enabled; + } + + public static String getCDCLogLocation() + { + return conf.cdc_raw_directory; + } + + public static Integer getCDCSpaceInMB() + { + return conf.cdc_total_space_in_mb; + } + + @VisibleForTesting + public static void setCDCSpaceInMB(Integer input) + { + conf.cdc_total_space_in_mb = input; + } + + public static Integer getCDCDiskCheckInterval() + { + return conf.cdc_free_space_check_interval_ms; + } + @VisibleForTesting public static void setEncryptionContext(EncryptionContext ec) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java index 86754b6..f88c04f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java @@ -18,6 +18,7 @@ package org.apache.cassandra.cql3.statements; import java.util.regex.Pattern; + import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java index 513ff1b..a08b193 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.cql3.statements; +import org.apache.cassandra.auth.Permission; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.auth.Permission; import org.apache.cassandra.exceptions.UnauthorizedException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java index c1a9d54..dee3385 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java +++ b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java @@ -131,6 +131,9 @@ public final class TableAttributes extends PropertyDefinitions if (hasOption(Option.CRC_CHECK_CHANCE)) builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE)); + if (hasOption(Option.CDC)) + builder.cdc(getBoolean(Option.CDC.toString(), false)); + return builder.build(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index a5d27bf..523e15f 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -44,7 +44,7 @@ import org.apache.cassandra.cache.*; import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.*; import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; @@ -814,7 +814,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * * @param memtable */ - public ListenableFuture<ReplayPosition> switchMemtableIfCurrent(Memtable memtable) + public ListenableFuture<CommitLogPosition> switchMemtableIfCurrent(Memtable memtable) { synchronized (data) { @@ -831,14 +831,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL * marked clean up to the position owned by the Memtable. */ - public ListenableFuture<ReplayPosition> switchMemtable() + public ListenableFuture<CommitLogPosition> switchMemtable() { synchronized (data) { logFlush(); Flush flush = new Flush(false); flushExecutor.execute(flush); - ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(flush.postFlush); + ListenableFutureTask<CommitLogPosition> task = ListenableFutureTask.create(flush.postFlush); postFlushExecutor.submit(task); return task; } @@ -881,7 +881,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a Future yielding the commit log position that can be guaranteed to have been successfully written * to sstables for this table once the future completes */ - public ListenableFuture<ReplayPosition> forceFlush() + public ListenableFuture<CommitLogPosition> forceFlush() { synchronized (data) { @@ -900,7 +900,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a Future yielding the commit log position that can be guaranteed to have been successfully written * to sstables for this table once the future completes */ - public ListenableFuture<ReplayPosition> forceFlush(ReplayPosition flushIfDirtyBefore) + public ListenableFuture<?> forceFlush(CommitLogPosition flushIfDirtyBefore) { // we don't loop through the remaining memtables since here we only care about commit log dirtiness // and this does not vary between a table and its table-backed indexes @@ -914,24 +914,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return a Future yielding the commit log position that can be guaranteed to have been successfully written * to sstables for this table once the future completes */ - private ListenableFuture<ReplayPosition> waitForFlushes() + private ListenableFuture<CommitLogPosition> waitForFlushes() { // we grab the current memtable; once any preceding memtables have flushed, we know its // commitLogLowerBound has been set (as this it is set with the upper bound of the preceding memtable) final Memtable current = data.getView().getCurrentMemtable(); - ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(new Callable<ReplayPosition>() - { - public ReplayPosition call() - { - logger.debug("forceFlush requested but everything is clean in {}", name); - return current.getCommitLogLowerBound(); - } + ListenableFutureTask<CommitLogPosition> task = ListenableFutureTask.create(() -> { + logger.debug("forceFlush requested but everything is clean in {}", name); + return current.getCommitLogLowerBound(); }); postFlushExecutor.execute(task); return task; } - public ReplayPosition forceBlockingFlush() + public CommitLogPosition forceBlockingFlush() { return FBUtilities.waitOnFuture(forceFlush()); } @@ -940,18 +936,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed. */ - private final class PostFlush implements Callable<ReplayPosition> + private final class PostFlush implements Callable<CommitLogPosition> { final boolean flushSecondaryIndexes; final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); + final CommitLogPosition commitLogUpperBound; volatile Throwable flushFailure = null; - final ReplayPosition commitLogUpperBound; final List<Memtable> memtables; final List<Collection<SSTableReader>> readers; - private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound, - List<Memtable> memtables, List<Collection<SSTableReader>> readers) + private PostFlush(boolean flushSecondaryIndexes, + OpOrder.Barrier writeBarrier, + CommitLogPosition commitLogUpperBound, + List<Memtable> memtables, + List<Collection<SSTableReader>> readers) { this.writeBarrier = writeBarrier; this.flushSecondaryIndexes = flushSecondaryIndexes; @@ -960,7 +959,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean this.readers = readers; } - public ReplayPosition call() + public CommitLogPosition call() { if (discardFlushResults == ColumnFamilyStore.this) return commitLogUpperBound; @@ -988,6 +987,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean throw new IllegalStateException(); } + // Must check commitLogUpperBound != null because Flush may find that all memtables are clean + // and so not set a commitLogUpperBound // If a flush errored out but the error was ignored, make sure we don't discard the commit log. if (flushFailure == null) { @@ -1044,7 +1045,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean writeBarrier = keyspace.writeOrder.newBarrier(); // submit flushes for the memtable for any indexed sub-cfses, and our own - AtomicReference<ReplayPosition> commitLogUpperBound = new AtomicReference<>(); + AtomicReference<CommitLogPosition> commitLogUpperBound = new AtomicReference<>(); for (ColumnFamilyStore cfs : concatWithIndexes()) { // switch all memtables, regardless of their dirty status, setting the barrier @@ -1062,7 +1063,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete; // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier - // replay positions have also completed, i.e. the memtables are done and ready to flush + // commit log segment position have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers); } @@ -1215,16 +1216,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // atomically set the upper bound for the commit log - private static void setCommitLogUpperBound(AtomicReference<ReplayPosition> commitLogUpperBound) + private static void setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound) { // we attempt to set the holder to the current commit log context. at the same time all writes to the memtables are // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry, // so that we know all operations prior to the position have not reached it yet - ReplayPosition lastReplayPosition; + CommitLogPosition lastReplayPosition; while (true) { - lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext()); - ReplayPosition currentLast = commitLogUpperBound.get(); + lastReplayPosition = new Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition())); + CommitLogPosition currentLast = commitLogUpperBound.get(); if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0) && commitLogUpperBound.compareAndSet(currentLast, lastReplayPosition)) break; @@ -1238,7 +1239,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void simulateFailedFlush() { discardFlushResults = this; - data.markFlushing(data.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this))); + data.markFlushing(data.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), this))); } public void resumeFlushing() @@ -1316,11 +1317,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * param @ key - key for update/insert * param @ columnFamily - columnFamily changes */ - public void apply(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup, ReplayPosition replayPosition) + public void apply(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup, CommitLogPosition commitLogPosition) { long start = System.nanoTime(); - Memtable mt = data.getMemtableFor(opGroup, replayPosition); + Memtable mt = data.getMemtableFor(opGroup, commitLogPosition); long timeDelta = mt.put(update, indexer, opGroup); DecoratedKey key = update.partitionKey(); invalidateCachedPartition(key); @@ -2105,7 +2106,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // recording the timestamp IN BETWEEN those actions. Any sstables created // with this timestamp or greater time, will not be marked for delete. // - // Bonus complication: since we store replay position in sstable metadata, + // Bonus complication: since we store commit log segment position in sstable metadata, // truncating those sstables means we will replay any CL segments from the // beginning if we restart before they [the CL segments] are discarded for // normal reasons post-truncate. To prevent this, we store truncation @@ -2113,7 +2114,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean logger.trace("truncating {}", name); final long truncatedAt; - final ReplayPosition replayAfter; + final CommitLogPosition replayAfter; if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot()) { @@ -2169,7 +2170,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable. */ - public Future<ReplayPosition> dumpMemtable() + public Future<CommitLogPosition> dumpMemtable() { synchronized (data) { @@ -2422,7 +2423,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public Iterable<ColumnFamilyStore> concatWithIndexes() { // we return the main CFS first, which we rely on for simplicity in switchMemtable(), for getting the - // latest replay position + // latest commit log segment position return Iterables.concat(Collections.singleton(this), indexManager.getAllIndexColumnFamilyStores()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 7876959..2a55992 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -23,22 +23,15 @@ import java.io.File; import java.io.FileFilter; import java.io.IOError; import java.io.IOException; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.*; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; -import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Iterables; import org.apache.commons.lang3.StringUtils; @@ -49,9 +42,9 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.utils.DirectorySizeCalculator; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -814,7 +807,6 @@ public class Directories return snapshotSpaceMap; } - public List<String> listEphemeralSnapshots() { final List<String> ephemeralSnapshots = new LinkedList<>(); @@ -928,7 +920,7 @@ public class Directories if (!input.isDirectory()) return 0; - TrueFilesSizeVisitor visitor = new TrueFilesSizeVisitor(); + SSTableSizeSummer visitor = new SSTableSizeSummer(sstableLister(Directories.OnTxnErr.THROW).listFiles()); try { Files.walkFileTree(input.toPath(), visitor); @@ -1012,22 +1004,15 @@ public class Directories dataDirectories[i] = new DataDirectory(new File(locations[i])); } - private class TrueFilesSizeVisitor extends SimpleFileVisitor<Path> + private class SSTableSizeSummer extends DirectorySizeCalculator { - private final AtomicLong size = new AtomicLong(0); - private final Set<String> visited = newHashSet(); //count each file only once - private final Set<String> alive; - - TrueFilesSizeVisitor() + SSTableSizeSummer(List<File> files) { - super(); - Builder<String> builder = ImmutableSet.builder(); - for (File file : sstableLister(Directories.OnTxnErr.THROW).listFiles()) - builder.add(file.getName()); - alive = builder.build(); + super(files); } - private boolean isAcceptable(Path file) + @Override + public boolean isAcceptable(Path file) { String fileName = file.toFile().getName(); Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParent().toFile(), fileName); @@ -1037,27 +1022,5 @@ public class Directories && !visited.contains(fileName) && !alive.contains(fileName); } - - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException - { - if (isAcceptable(file)) - { - size.addAndGet(attrs.size()); - visited.add(file.toFile().getName()); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException - { - return FileVisitResult.CONTINUE; - } - - public long getAllocatedSize() - { - return size.get(); - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 022b996..6e44308 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -27,12 +26,14 @@ import java.util.concurrent.locks.Lock; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -48,12 +49,8 @@ import org.apache.cassandra.metrics.KeyspaceMetrics; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * It represents a Keyspace. @@ -489,11 +486,11 @@ public class Keyspace try (OpOrder.Group opGroup = writeOrder.start()) { // write the mutation to the commitlog and memtables - ReplayPosition replayPosition = null; + CommitLogPosition commitLogPosition = null; if (writeCommitLog) { Tracing.trace("Appending to commitlog"); - replayPosition = CommitLog.instance.add(mutation); + commitLogPosition = CommitLog.instance.add(mutation); } for (PartitionUpdate upd : mutation.getPartitionUpdates()) @@ -526,7 +523,7 @@ public class Keyspace UpdateTransaction indexTransaction = updateIndexes ? cfs.indexManager.newUpdateTransaction(upd, opGroup, nowInSec) : UpdateTransaction.NO_OP; - cfs.apply(upd, indexTransaction, opGroup, replayPosition); + cfs.apply(upd, indexTransaction, opGroup, commitLogPosition); if (requiresViewUpdate) baseComplete.set(System.currentTimeMillis()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 27c5372..7a46d8a 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -33,7 +33,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -68,23 +68,24 @@ public class Memtable implements Comparable<Memtable> // the write barrier for directing writes to this memtable during a switch private volatile OpOrder.Barrier writeBarrier; - // the precise upper bound of ReplayPosition owned by this memtable - private volatile AtomicReference<ReplayPosition> commitLogUpperBound; - // the precise lower bound of ReplayPosition owned by this memtable; equal to its predecessor's commitLogUpperBound - private AtomicReference<ReplayPosition> commitLogLowerBound; - // the approximate lower bound by this memtable; must be <= commitLogLowerBound once our predecessor + // the precise upper bound of CommitLogPosition owned by this memtable + private volatile AtomicReference<CommitLogPosition> commitLogUpperBound; + // the precise lower bound of CommitLogPosition owned by this memtable; equal to its predecessor's commitLogUpperBound + private AtomicReference<CommitLogPosition> commitLogLowerBound; + + // The approximate lower bound by this memtable; must be <= commitLogLowerBound once our predecessor // has been finalised, and this is enforced in the ColumnFamilyStore.setCommitLogUpperBound - private final ReplayPosition approximateCommitLogLowerBound = CommitLog.instance.getContext(); + private final CommitLogPosition approximateCommitLogLowerBound = CommitLog.instance.getCurrentPosition(); public int compareTo(Memtable that) { return this.approximateCommitLogLowerBound.compareTo(that.approximateCommitLogLowerBound); } - public static final class LastReplayPosition extends ReplayPosition + public static final class LastCommitLogPosition extends CommitLogPosition { - public LastReplayPosition(ReplayPosition copy) { - super(copy.segment, copy.position); + public LastCommitLogPosition(CommitLogPosition copy) { + super(copy.segmentId, copy.position); } } @@ -107,7 +108,7 @@ public class Memtable implements Comparable<Memtable> private final StatsCollector statsCollector = new StatsCollector(); // only to be used by init(), to setup the very first memtable for the cfs - public Memtable(AtomicReference<ReplayPosition> commitLogLowerBound, ColumnFamilyStore cfs) + public Memtable(AtomicReference<CommitLogPosition> commitLogLowerBound, ColumnFamilyStore cfs) { this.cfs = cfs; this.commitLogLowerBound = commitLogLowerBound; @@ -143,10 +144,10 @@ public class Memtable implements Comparable<Memtable> } @VisibleForTesting - public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition) + public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound) { assert this.writeBarrier == null; - this.commitLogUpperBound = lastReplayPosition; + this.commitLogUpperBound = commitLogUpperBound; this.writeBarrier = writeBarrier; allocator.setDiscarding(); } @@ -157,7 +158,7 @@ public class Memtable implements Comparable<Memtable> } // decide if this memtable should take the write, or if it should go to the next memtable - public boolean accepts(OpOrder.Group opGroup, ReplayPosition replayPosition) + public boolean accepts(OpOrder.Group opGroup, CommitLogPosition commitLogPosition) { // if the barrier hasn't been set yet, then this memtable is still taking ALL writes OpOrder.Barrier barrier = this.writeBarrier; @@ -167,7 +168,7 @@ public class Memtable implements Comparable<Memtable> if (!barrier.isAfter(opGroup)) return false; // if we aren't durable we are directed only by the barrier - if (replayPosition == null) + if (commitLogPosition == null) return true; while (true) { @@ -176,17 +177,17 @@ public class Memtable implements Comparable<Memtable> // its current value and ours; if it HAS been finalised, we simply accept its judgement // this permits us to coordinate a safe boundary, as the boundary choice is made // atomically wrt our max() maintenance, so an operation cannot sneak into the past - ReplayPosition currentLast = commitLogUpperBound.get(); - if (currentLast instanceof LastReplayPosition) - return currentLast.compareTo(replayPosition) >= 0; - if (currentLast != null && currentLast.compareTo(replayPosition) >= 0) + CommitLogPosition currentLast = commitLogUpperBound.get(); + if (currentLast instanceof LastCommitLogPosition) + return currentLast.compareTo(commitLogPosition) >= 0; + if (currentLast != null && currentLast.compareTo(commitLogPosition) >= 0) return true; - if (commitLogUpperBound.compareAndSet(currentLast, replayPosition)) + if (commitLogUpperBound.compareAndSet(currentLast, commitLogPosition)) return true; } } - public ReplayPosition getCommitLogLowerBound() + public CommitLogPosition getCommitLogLowerBound() { return commitLogLowerBound.get(); } @@ -201,7 +202,7 @@ public class Memtable implements Comparable<Memtable> return partitions.isEmpty(); } - public boolean mayContainDataBefore(ReplayPosition position) + public boolean mayContainDataBefore(CommitLogPosition position) { return approximateCommitLogLowerBound.compareTo(position) < 0; } @@ -219,7 +220,7 @@ public class Memtable implements Comparable<Memtable> * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate * OpOrdering. * - * replayPosition should only be null if this is a secondary index, in which case it is *expected* to be null + * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null */ long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 4bad781..61e5ee9 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -61,6 +61,8 @@ public class Mutation implements IMutation // keep track of when mutation has started waiting for a MV partition lock public final AtomicLong viewLockAcquireStart = new AtomicLong(0); + private boolean cdcEnabled = false; + public Mutation(String keyspaceName, DecoratedKey key) { this(keyspaceName, key, new HashMap<>()); @@ -123,10 +125,21 @@ public class Mutation implements IMutation return modifications.get(cfId); } + /** + * Adds PartitionUpdate to the local set of modifications. + * Assumes no updates for the Table this PartitionUpdate impacts. + * + * @param update PartitionUpdate to append to Modifications list + * @return Mutation this mutation + * @throws IllegalArgumentException If PartitionUpdate for duplicate table is passed as argument + */ public Mutation add(PartitionUpdate update) { assert update != null; assert update.partitionKey().getPartitioner() == key.getPartitioner(); + + cdcEnabled |= update.metadata().params.cdc; + PartitionUpdate prev = modifications.put(update.metadata().cfId, update); if (prev != null) // developer error @@ -256,6 +269,11 @@ public class Mutation implements IMutation return gcgs; } + public boolean trackedByCDC() + { + return cdcEnabled; + } + public String toString() { return toString(false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index f25eb3c..026eba1 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -41,7 +41,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.functions.*; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -455,7 +455,7 @@ public final class SystemKeyspace .build(); } - private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords; + private static volatile Map<UUID, Pair<CommitLogPosition, Long>> truncationRecords; public enum BootstrapState { @@ -619,7 +619,7 @@ public final class SystemKeyspace return Pair.create(generation, lastKey); } - public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) + public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); @@ -638,11 +638,11 @@ public final class SystemKeyspace forceBlockingFlush(LOCAL); } - private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) + private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, CommitLogPosition position) { try (DataOutputBuffer out = new DataOutputBuffer()) { - ReplayPosition.serializer.serialize(position, out); + CommitLogPosition.serializer.serialize(position, out); out.writeLong(truncatedAt); return singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); } @@ -652,30 +652,30 @@ public final class SystemKeyspace } } - public static ReplayPosition getTruncatedPosition(UUID cfId) + public static CommitLogPosition getTruncatedPosition(UUID cfId) { - Pair<ReplayPosition, Long> record = getTruncationRecord(cfId); + Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId); return record == null ? null : record.left; } public static long getTruncatedAt(UUID cfId) { - Pair<ReplayPosition, Long> record = getTruncationRecord(cfId); + Pair<CommitLogPosition, Long> record = getTruncationRecord(cfId); return record == null ? Long.MIN_VALUE : record.right; } - private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId) + private static synchronized Pair<CommitLogPosition, Long> getTruncationRecord(UUID cfId) { if (truncationRecords == null) truncationRecords = readTruncationRecords(); return truncationRecords.get(cfId); } - private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords() + private static Map<UUID, Pair<CommitLogPosition, Long>> readTruncationRecords() { UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); - Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>(); + Map<UUID, Pair<CommitLogPosition, Long>> records = new HashMap<>(); if (!rows.isEmpty() && rows.one().has("truncated_at")) { @@ -687,11 +687,11 @@ public final class SystemKeyspace return records; } - private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes) + private static Pair<CommitLogPosition, Long> truncationRecordFromBlob(ByteBuffer bytes) { try (RebufferingInputStream in = new DataInputBuffer(bytes, true)) { - return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); + return Pair.create(CommitLogPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/WriteType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java index fdbe97d..11909e7 100644 --- a/src/java/org/apache/cassandra/db/WriteType.java +++ b/src/java/org/apache/cassandra/db/WriteType.java @@ -25,5 +25,6 @@ public enum WriteType COUNTER, BATCH_LOG, CAS, - VIEW; + VIEW, + CDC; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java new file mode 100644 index 0000000..b8f0a4e --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -0,0 +1,584 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + +/** + * Performs eager-creation of commit log segments in a background thread. All the + * public methods are thread safe. + */ +public abstract class AbstractCommitLogSegmentManager +{ + static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class); + + // Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation. + private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>(); + + /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */ + private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>(); + + /** Active segments, containing unflushed data */ + private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>(); + + /** The segment we are currently allocating commit log records to */ + protected volatile CommitLogSegment allocatingFrom = null; + + private final WaitQueue hasAvailableSegments = new WaitQueue(); + + final String storageDirectory; + + /** + * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size + * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic + * can see the effect of recycling segments immediately (even though they're really happening asynchronously + * on the manager thread, which will take a ms or two). + */ + private final AtomicLong size = new AtomicLong(); + + /** + * New segment creation is initially disabled because we'll typically get some "free" segments + * recycled after log replay. + */ + volatile boolean createReserveSegments = false; + + // Used by tests to determine if segment manager is active or not. + volatile boolean processingTask = false; + + private Thread managerThread; + protected volatile boolean run = true; + protected final CommitLog commitLog; + + private static final SimpleCachedBufferPool bufferPool = + new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersPerPool(), DatabaseDescriptor.getCommitLogSegmentSize()); + + AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory) + { + this.commitLog = commitLog; + this.storageDirectory = storageDirectory; + } + + void start() + { + // The run loop for the manager thread + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + while (run) + { + try + { + Runnable task = segmentManagementTasks.poll(); + processingTask = true; + if (task == null) + { + // if we have no more work to do, check if we should create a new segment + if (!atSegmentLimit() && + availableSegments.isEmpty() && + (activeSegments.isEmpty() || createReserveSegments)) + { + logger.trace("No segments in reserve; creating a fresh one"); + // TODO : some error handling in case we fail to create a new segment + availableSegments.add(createSegment()); + hasAvailableSegments.signalAll(); + } + + // flush old Cfs if we're full + long unused = unusedCapacity(); + if (unused < 0) + { + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); + long spaceToReclaim = 0; + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + segmentsToRecycle.add(segment); + spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize(); + if (spaceToReclaim + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); + } + + // Since we're operating on a "null" allocation task, block here for the next task on the + // queue rather than looping, grabbing another null, and repeating the above work. + try + { + processingTask = false; + task = segmentManagementTasks.take(); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + task.run(); + processingTask = false; + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) + return; + // sleep some arbitrary period to avoid spamming CL + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } + } + + private boolean atSegmentLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); + } + }; + + run = true; + + managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR"); + managerThread.start(); + } + + + /** + * Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything. + */ + public abstract void shutdown(); + + /** + * Allocate a segment within this CLSM. Should either succeed or throw. + */ + public abstract Allocation allocate(Mutation mutation, int size); + + /** + * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM + * decide what to do with those segments on disk after they've been replayed. + */ + abstract void handleReplayedSegment(final File file); + + /** + * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit + * to segment manager so it's performed on segment management thread. + */ + abstract CommitLogSegment createSegment(); + + /** + * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment + * manager so it's performend on segment management thread, or perform while segment management thread is shutdown + * during testing resets. + * + * @param segment segment to be discarded + * @param delete whether or not the segment is safe to be deleted. + */ + abstract void discard(CommitLogSegment segment, boolean delete); + + + /** + * Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator + * is working on initial allocation of a CommitLogSegment. + */ + CommitLogSegment allocatingFrom() + { + CommitLogSegment r = allocatingFrom; + if (r == null) + { + advanceAllocatingFrom(null); + r = allocatingFrom; + } + return r; + } + + /** + * Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it. + * Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled. + * + * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM. + */ + protected void advanceAllocatingFrom(CommitLogSegment old) + { + while (true) + { + CommitLogSegment next; + synchronized (this) + { + // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments + // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432 + if (allocatingFrom != old) + return; + next = availableSegments.poll(); + if (next != null) + { + allocatingFrom = next; + activeSegments.add(next); + } + } + + if (next != null) + { + if (old != null) + { + // Now we can run the user defined command just after switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + commitLog.archiver.maybeArchive(old); + + // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it + old.discardUnusedTail(); + } + + // request that the CL be synced out-of-band, as we've finished a segment + commitLog.requestExtraSync(); + return; + } + + // no more segments, so register to receive a signal when not empty + WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time()); + + // trigger the management thread; this must occur after registering + // the signal to ensure we are woken by any new segment creation + wakeManager(); + + // check if the queue has already been added to before waiting on the signal, to catch modifications + // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change + if (!availableSegments.isEmpty() || allocatingFrom != old) + { + signal.cancel(); + // if we've been beaten, just stop immediately + if (allocatingFrom != old) + return; + // otherwise try again, as there should be an available segment + continue; + } + + // can only reach here if the queue hasn't been inserted into + // before we registered the signal, as we only remove items from the queue + // after updating allocatingFrom. Can safely block until we are signalled + // by the allocator that new segments have been published + signal.awaitUninterruptibly(); + } + } + + protected void wakeManager() + { + // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary) + segmentManagementTasks.add(Runnables.doNothing()); + } + + /** + * Switch to a new segment, regardless of how much is left in the current one. + * + * Flushes any dirty CFs for this segment and any older segments, and then recycles + * the segments + */ + void forceRecycleAll(Iterable<UUID> droppedCfs) + { + List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); + CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); + advanceAllocatingFrom(last); + + // wait for the commit log modifications + last.waitForModifications(); + + // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes + // on the relevant keyspaces to complete + Keyspace.writeOrder.awaitNewBarrier(); + + // flush and wait for all CFs that are dirty in segments up-to and including 'last' + Future<?> future = flushDataFrom(segmentsToRecycle, true); + try + { + future.get(); + + for (CommitLogSegment segment : activeSegments) + for (UUID cfId : droppedCfs) + segment.markClean(cfId, segment.getCurrentCommitLogPosition()); + + // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() + // if the previous active segment was the only one to recycle (since an active segment isn't + // necessarily dirty, and we only call dCS after a flush). + for (CommitLogSegment segment : activeSegments) + { + if (segment.isUnused()) + recycleSegment(segment); + } + + CommitLogSegment first; + if ((first = activeSegments.peek()) != null && first.id <= last.id) + logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); + } + catch (Throwable t) + { + // for now just log the error + logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); + } + } + + /** + * Indicates that a segment is no longer in use and that it should be recycled. + * + * @param segment segment that is no longer in use + */ + void recycleSegment(final CommitLogSegment segment) + { + boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); + if (activeSegments.remove(segment)) + { + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + discardSegment(segment, archiveSuccess); + } + else + { + logger.warn("segment {} not found in activeSegments queue", segment); + } + } + + /** + * Indicates that a segment file should be deleted. + * + * @param segment segment to be discarded + */ + private void discardSegment(final CommitLogSegment segment, final boolean deleteFile) + { + logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script"); + segmentManagementTasks.add(() -> discard(segment, deleteFile)); + } + + /** + * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. + * @param addedSize + */ + void addSize(long addedSize) + { + size.addAndGet(addedSize); + } + + /** + * @return the space (in bytes) used by all segment files. + */ + public long onDiskSize() + { + return size.get(); + } + + private long unusedCapacity() + { + long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; + long currentSize = size.get(); + logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total); + return total - currentSize; + } + + /** + * @param name the filename to check + * @return true if file is managed by this manager. + */ + public boolean manages(String name) + { + for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments)) + if (segment.getName().equals(name)) + return true; + return false; + } + + /** + * Throws a flag that enables the behavior of keeping at least one spare segment + * available at all times. + */ + void enableReserveSegmentCreation() + { + createReserveSegments = true; + wakeManager(); + } + + /** + * Force a flush on all CFs that are still dirty in @param segments. + * + * @return a Future that will finish when all the flushes are complete. + */ + private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) + { + if (segments.isEmpty()) + return Futures.immediateFuture(null); + final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition(); + + // a map of CfId -> forceFlush() to ensure we only queue one flush per cf + final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); + + for (CommitLogSegment segment : segments) + { + for (UUID dirtyCFId : segment.getDirtyCFIDs()) + { + Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); + if (pair == null) + { + // even though we remove the schema entry before a final flush when dropping a CF, + // it's still possible for a writer to race and finish his append after the flush. + logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); + segment.markClean(dirtyCFId, segment.getCurrentCommitLogPosition()); + } + else if (!flushes.containsKey(dirtyCFId)) + { + String keyspace = pair.left; + final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); + // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, + // no deadlock possibility since switchLock removal + flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); + } + } + } + + return Futures.allAsList(flushes.values()); + } + + /** + * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. + * Only call this after the AbstractCommitLogService is shut down. + */ + public void stopUnsafe(boolean deleteSegments) + { + logger.trace("CLSM closing and clearing existing commit log segments..."); + createReserveSegments = false; + + awaitManagementTasksCompletion(); + + shutdown(); + try + { + awaitTermination(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + synchronized (this) + { + for (CommitLogSegment segment : activeSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + activeSegments.clear(); + + for (CommitLogSegment segment : availableSegments) + closeAndDeleteSegmentUnsafe(segment, deleteSegments); + availableSegments.clear(); + } + + allocatingFrom = null; + + segmentManagementTasks.clear(); + + size.set(0L); + + logger.trace("CLSM done with closing and clearing existing commit log segments."); + } + + // Used by tests only. + void awaitManagementTasksCompletion() + { + while (segmentManagementTasks.size() > 0 || processingTask) + Thread.yield(); + } + + /** + * Explicitly for use only during resets in unit testing. + */ + private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) + { + try + { + discard(segment, delete); + } + catch (AssertionError ignored) + { + // segment file does not exist + } + } + + /** + * Returns when the management thread terminates. + */ + public void awaitTermination() throws InterruptedException + { + managerThread.join(); + + for (CommitLogSegment segment : activeSegments) + segment.close(); + + for (CommitLogSegment segment : availableSegments) + segment.close(); + + bufferPool.shutdown(); + } + + /** + * @return a read-only collection of the active commit log segments + */ + @VisibleForTesting + public Collection<CommitLogSegment> getActiveSegments() + { + return Collections.unmodifiableCollection(activeSegments); + } + + /** + * @return the current CommitLogPosition of the active segment we're allocating from + */ + CommitLogPosition getCurrentPosition() + { + return allocatingFrom().getCurrentCommitLogPosition(); + } + + /** + * Forces a disk flush on the commit log files that need it. Blocking. + */ + public void sync(boolean syncAllSegments) throws IOException + { + CommitLogSegment current = allocatingFrom(); + for (CommitLogSegment segment : getActiveSegments()) + { + if (!syncAllSegments && segment.id > current.id) + return; + segment.sync(); + } + } + + /** + * Used by compressed and encrypted segments to share a buffer pool across the CLSM. + */ + SimpleCachedBufferPool getBufferPool() + { + return bufferPool; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 113d1ba..0ba4f55 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -29,7 +29,6 @@ import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; public abstract class AbstractCommitLogService { - private Thread thread; private volatile boolean shutdown = false; @@ -89,7 +88,7 @@ public abstract class AbstractCommitLogService // sync and signal long syncStarted = System.currentTimeMillis(); - //This is a target for Byteman in CommitLogSegmentManagerTest + // This is a target for Byteman in CommitLogSegmentManagerTest commitLog.sync(shutdown); lastSyncedAt = syncStarted; syncComplete.signalAll();