This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 99d034a224 Option to disable CDC on SSTable repair 99d034a224 is described below commit 99d034a2245c44becb6a730c77ad51ab9340f3a7 Author: Yifan Cai <y...@apache.org> AuthorDate: Mon Jun 6 13:15:33 2022 -0700 Option to disable CDC on SSTable repair patch by Yifan Cai; reviewed by Josh McKenzie for CASSANDRA-17666 --- CHANGES.txt | 1 + NEWS.txt | 6 ++ conf/cassandra.yaml | 12 +++ src/java/org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 10 +++ .../apache/cassandra/db/commitlog/CommitLog.java | 29 +++++-- .../cassandra/db/commitlog/CommitLogMBean.java | 6 ++ .../db/streaming/CassandraStreamReceiver.java | 25 ++++-- .../test/cdc/ToggleCDCOnRepairEnabledTest.java | 97 ++++++++++++++++++++++ 9 files changed, 175 insertions(+), 14 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index d6b4ff5ab9..9e31bd96e6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Option to disable CDC writes of repaired data (CASSANDRA-17666) * When a node is bootstrapping it gets the whole gossip state but applies in random order causing some cases where StorageService will fail causing an instance to not show up in TokenMetadata (CASSANDRA-17676) * Add CQLSH command SHOW REPLICAS (CASSANDRA-17577) * Add guardrail to allow disabling of SimpleStrategy (CASSANDRA-17647) diff --git a/NEWS.txt b/NEWS.txt index 5a52c6e3ba..996113d7c7 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -57,6 +57,12 @@ using the provided 'sstableupgrade' tool. New features ------------ + - Added a new configuration cdc_on_repair_enabled to toggle whether CDC mutations are replayed through the + write path on streaming, e.g. repair. When enabled, CDC data streamed to the destination node will be written into + commit log first. When disabled, the streamed CDC data is written into SSTables just the same as normal streaming. + If this is set to false, streaming will be considerably faster however it's possible that, in extreme situations + (losing > quorum # nodes in a replica set), you may have data in your SSTables that never makes it to the CDC log. + The default is true/enabled. The configuration can be altered via JMX. - Added a new CQL function, maxwritetime. It shows the largest unix timestamp that the data was written, similar to its sibling CQL function, writetime. Unlike writetime, maxwritetime can be applied to multi-cell data types, e.g. non-frozen collections and UDT, and returns the largest timestamp. One should not to use it when upgrading to 4.2. diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 491740f012..3bab6712c8 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -298,6 +298,18 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner # containing a CDC-enabled table if at space limit in cdc_raw_directory). cdc_enabled: false +# Specify whether writes to the CDC-enabled tables should be blocked when CDC data on disk has reached to the limit. +# When setting to false, the writes will not be blocked and the oldest CDC data on disk will be deleted to +# ensure the size constraint. The default is true. +# cdc_block_writes: true + +# Specify whether CDC mutations are replayed through the write path on streaming, e.g. repair. +# When enabled, CDC data streamed to the destination node will be written into commit log first. When setting to false, +# the streamed CDC data is written into SSTables just the same as normal streaming. The default is true. +# If this is set to false, streaming will be considerably faster however it's possible that, in extreme situations +# (losing > quorum # nodes in a replica set), you may have data in your SSTables that never makes it to the CDC log. +# cdc_on_repair_enabled: true + # CommitLogSegments are moved to this directory on flush if cdc_enabled: true and the # segment contains mutations for a CDC-enabled table. This should be placed on a # separate spindle than the data directories. If not set, the default directory is diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index c3c5b3582c..3d2dbb7b40 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -380,6 +380,9 @@ public class Config // When true, new CDC mutations are rejected/blocked when reaching max CDC storage. // When false, new CDC mutations can always be added. But it will remove the oldest CDC commit log segment on full. public volatile boolean cdc_block_writes = true; + // When true, CDC data in SSTable go through commit logs during internodes streaming, e.g. repair + // When false, it behaves the same as normal streaming. + public volatile boolean cdc_on_repair_enabled = true; public String cdc_raw_directory; @Replaces(oldName = "cdc_total_space_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true) public DataStorageSpec.IntMebibytesBound cdc_total_space = new DataStorageSpec.IntMebibytesBound("0MiB"); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 16b5c4b78d..8151c96871 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3486,6 +3486,16 @@ public class DatabaseDescriptor conf.cdc_block_writes = val; } + public static boolean isCDCOnRepairEnabled() + { + return conf.cdc_on_repair_enabled; + } + + public static void setCDCOnRepairEnabled(boolean val) + { + conf.cdc_on_repair_enabled = val; + } + public static String getCDCLogLocation() { return conf.cdc_raw_directory; diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 37df1f9451..87426122fe 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -429,11 +429,7 @@ public class CommitLog implements CommitLogMBean @Override public void setCDCBlockWrites(boolean val) { - Preconditions.checkState(DatabaseDescriptor.isCDCEnabled(), - "Unable to set block_writes (%s): CDC is not enabled.", val); - Preconditions.checkState(segmentManager instanceof CommitLogSegmentManagerCDC, - "CDC is enabled but we have the wrong CommitLogSegmentManager type: %s. " + - "Please report this as bug.", segmentManager.getClass().getName()); + ensureCDCEnabled("Unable to set block_writes."); boolean oldVal = DatabaseDescriptor.getCDCBlockWrites(); CommitLogSegment currentSegment = segmentManager.allocatingFrom(); // Update the current segment CDC state to PERMITTED if block_writes is disabled now, and it was in FORBIDDEN state @@ -443,6 +439,29 @@ public class CommitLog implements CommitLogMBean logger.info("Updated CDC block_writes from {} to {}", oldVal, val); } + + @Override + public boolean isCDCOnRepairEnabled() + { + return DatabaseDescriptor.isCDCOnRepairEnabled(); + } + + @Override + public void setCDCOnRepairEnabled(boolean value) + { + ensureCDCEnabled("Unable to set cdc_on_repair_enabled."); + DatabaseDescriptor.setCDCOnRepairEnabled(value); + logger.info("Set cdc_on_repair_enabled to {}", value); + } + + private void ensureCDCEnabled(String hint) + { + Preconditions.checkState(DatabaseDescriptor.isCDCEnabled(), "CDC is not enabled. %s", hint); + Preconditions.checkState(segmentManager instanceof CommitLogSegmentManagerCDC, + "CDC is enabled but we have the wrong CommitLogSegmentManager type: %s. " + + "Please report this as bug.", segmentManager.getClass().getName()); + } + /** * Shuts down the threads used by the commit log, blocking until completion. * TODO this should accept a timeout, and throw TimeoutException diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java index 7e8deca9b0..189916c66e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java @@ -88,4 +88,10 @@ public interface CommitLogMBean public boolean getCDCBlockWrites(); public void setCDCBlockWrites(boolean val); + + /** Returns true if internodes streaming of CDC data should go through write path */ + boolean isCDCOnRepairEnabled(); + + /** Set whether enable write path for CDC data during internodes streaming, e.g. repair */ + void setCDCOnRepairEnabled(boolean value); } diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index 48de8b54fc..b5963978c0 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -25,18 +25,16 @@ import java.util.Set; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; - -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; -import org.apache.cassandra.io.sstable.SSTable; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator; @@ -45,6 +43,7 @@ import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.streaming.IncomingStream; @@ -172,23 +171,31 @@ public class CassandraStreamReceiver implements StreamReceiver return cfs.metadata().params.cdc; } + // returns true iif it is a cdc table and cdc on repair is enabled. + private boolean cdcRequiresWriteCommitLog(ColumnFamilyStore cfs) + { + return DatabaseDescriptor.isCDCOnRepairEnabled() && hasCDC(cfs); + } + /* * We have a special path for views and for CDC. * * For views, since the view requires cleaning up any pre-existing state, we must put all partitions * through the same write path as normal mutations. This also ensures any 2is are also updated. * - * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they - * can be archived by the CDC process on discard. + * For CDC-enabled tables and write path for CDC is enabled, we want to ensure that the mutations are + * run through the CommitLog, so they can be archived by the CDC process on discard. */ private boolean requiresWritePath(ColumnFamilyStore cfs) { - return hasCDC(cfs) || cfs.streamToMemtable() || (session.streamOperation().requiresViewBuild() && hasViews(cfs)); + return cdcRequiresWriteCommitLog(cfs) + || cfs.streamToMemtable() + || (session.streamOperation().requiresViewBuild() && hasViews(cfs)); } private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) { - boolean hasCdc = hasCDC(cfs); + boolean writeCDCCommitLog = cdcRequiresWriteCommitLog(cfs); ColumnFilter filter = ColumnFilter.all(cfs.metadata()); for (SSTableReader reader : readers) { @@ -206,7 +213,7 @@ public class CassandraStreamReceiver implements StreamReceiver // If the CFS has CDC, however, these updates need to be written to the CommitLog // so they get archived into the cdc_raw folder ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)), - hasCdc, + writeCDCCommitLog, true, false); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/cdc/ToggleCDCOnRepairEnabledTest.java b/test/distributed/org/apache/cassandra/distributed/test/cdc/ToggleCDCOnRepairEnabledTest.java new file mode 100644 index 0000000000..499cf076af --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/cdc/ToggleCDCOnRepairEnabledTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.cdc; + +import java.util.function.Consumer; + +import org.junit.Test; + +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogSegment; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.assertTrue; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; + +public class ToggleCDCOnRepairEnabledTest extends TestBaseImpl +{ + @Test + public void testCDCOnRepairIsEnabled() throws Exception + { + testCDCOnRepairEnabled(true, cluster -> { + cluster.get(2).runOnInstance(() -> { + boolean containCDCInLog = CommitLog.instance.segmentManager + .getActiveSegments() + .stream() + .anyMatch(s -> s.getCDCState() == CommitLogSegment.CDCState.CONTAINS); + assertTrue("Mutation should be added to commit log when cdc_on_repair_enabled is true", + containCDCInLog); + }); + }); + } + + @Test + public void testCDCOnRepairIsDisabled() throws Exception + { + testCDCOnRepairEnabled(false, cluster -> { + cluster.get(2).runOnInstance(() -> { + boolean containCDCInLog = CommitLog.instance.segmentManager + .getActiveSegments() + .stream() + .allMatch(s -> s.getCDCState() != CommitLogSegment.CDCState.CONTAINS); + assertTrue("No mutation should be added to commit log when cdc_on_repair_enabled is false", + containCDCInLog); + }); + }); + } + + // test helper to repair data between nodes when cdc_on_repair_enabled is on or off. + private void testCDCOnRepairEnabled(boolean enabled, Consumer<Cluster> assertion) throws Exception + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(c -> c.set("cdc_enabled", true) + .set("cdc_on_repair_enabled", enabled) + .with(Feature.NETWORK) + .with(Feature.GOSSIP)) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k INT PRIMARY KEY, v INT) WITH cdc=true")); + + // Data only in node1 + cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1, 1)")); + Object[][] result = cluster.get(1).executeInternal(withKeyspace("SELECT * FROM %s.tbl WHERE k = 1")); + assertRows(result, row(1, 1)); + result = cluster.get(2).executeInternal(withKeyspace("SELECT * FROM %s.tbl WHERE k = 1")); + assertRows(result); + + // repair + cluster.get(1).flush(KEYSPACE); + cluster.get(2).nodetool("repair", KEYSPACE, "tbl"); + + // verify node2 now have data + result = cluster.get(2).executeInternal(withKeyspace("SELECT * FROM %s.tbl WHERE k = 1")); + assertRows(result, row(1, 1)); + + assertion.accept(cluster); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org