fix specifying and altering crc_check_chance; patch by Marcus Eriksson and Aleksey Yeschenko, reviewed by Aleksey Yeschenko for CASSANDRA-5053
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1a66ee9a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1a66ee9a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1a66ee9a Branch: refs/heads/trunk Commit: 1a66ee9a94d67ae475ae287b280517aa9e3cc318 Parents: e792187 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Wed Dec 19 14:54:46 2012 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Dec 19 14:54:46 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 7 ++ .../cassandra/db/ColumnFamilyStoreMBean.java | 5 + .../io/compress/CompressedRandomAccessReader.java | 2 +- .../io/compress/CompressionParameters.java | 62 ++++++++++++-- 5 files changed, 67 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a66ee9a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cc75791..38d9d47 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.1.9 * fix multithreaded compaction deadlock (CASSANDRA-4492) + * fix specifying and altering crc_check_chance (CASSANDRA-5053) 1.1.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a66ee9a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 67a883d..8284d38 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -201,6 +201,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean metadata.compressionParameters = CompressionParameters.create(opts); } + public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException + { + for (SSTableReader sstable : table.getAllSSTables()) + if (sstable.compression) + sstable.getCompressionMetadata().parameters.setCrcCheckChance(crcCheckChance); + } + private ColumnFamilyStore(Table table, String columnFamilyName, IPartitioner partitioner, http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a66ee9a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index 1d9959e..26da8be 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -215,6 +215,11 @@ public interface ColumnFamilyStoreMBean public void setCompressionParameters(Map<String,String> opts) throws ConfigurationException; /** + * Set new crc check chance + */ + public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException; + + /** * Disable automatic compaction. */ public void disableAutoCompaction(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a66ee9a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index 5403120..3d3b95b 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -90,7 +90,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0); - if (metadata.parameters.crcChance > FBUtilities.threadLocalRandom().nextDouble()) + if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble()) { checksum.update(buffer, 0, validBufferBytes); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1a66ee9a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java index 9be71f6..05cc707 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java @@ -23,16 +23,22 @@ import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.AbstractSet; import java.util.Set; -import org.apache.avro.util.Utf8; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.config.ConfigurationException; public class CompressionParameters { + private static final Logger logger = LoggerFactory.getLogger(CompressionParameters.class); + public final static int DEFAULT_CHUNK_LENGTH = 65536; public final static double DEFAULT_CRC_CHECK_CHANCE = 1.0; @@ -40,9 +46,11 @@ public class CompressionParameters public static final String CHUNK_LENGTH_KB = "chunk_length_kb"; public static final String CRC_CHECK_CHANCE = "crc_check_chance"; + public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE); + public final ICompressor sstableCompressor; private final Integer chunkLength; - public final double crcChance; + private volatile double crcCheckChance; public final Map<String, String> otherOptions; // Unrecognized options, can be use by the compressor public static CompressionParameters create(Map<? extends CharSequence, ? extends CharSequence> opts) throws ConfigurationException @@ -64,16 +72,53 @@ public class CompressionParameters public CompressionParameters(ICompressor sstableCompressor) { - this(sstableCompressor, null, Collections.<String, String>emptyMap()); + // can't try/catch as first statement in the constructor, thus repeating constructor code here. + this.sstableCompressor = sstableCompressor; + chunkLength = null; + otherOptions = Collections.emptyMap(); + crcCheckChance = DEFAULT_CRC_CHECK_CHANCE; } - public CompressionParameters(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions) + public CompressionParameters(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException { this.sstableCompressor = sstableCompressor; this.chunkLength = chunkLength; this.otherOptions = otherOptions; - String chance = otherOptions.get(CRC_CHECK_CHANCE); - this.crcChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : Double.parseDouble(chance); + this.crcCheckChance = otherOptions.get(CRC_CHECK_CHANCE) == null + ? DEFAULT_CRC_CHECK_CHANCE + : parseCrcCheckChance(otherOptions.get(CRC_CHECK_CHANCE)); + } + + public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException + { + validateCrcCheckChance(crcCheckChance); + logger.debug("Setting crcCheckChance to {}", crcCheckChance); + this.crcCheckChance = crcCheckChance; + } + + public double getCrcCheckChance() + { + return this.crcCheckChance; + } + + private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException + { + try + { + double chance = Double.parseDouble(crcCheckChance); + validateCrcCheckChance(chance); + return chance; + } + catch (NumberFormatException e) + { + throw new ConfigurationException("crc_check_chance should be a double"); + } + } + + private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException + { + if (crcCheckChance < 0.0d || crcCheckChance > 1.0d) + throw new ConfigurationException("crc_check_chance should be between 0.0 and 1.0"); } public int chunkLength() @@ -111,7 +156,7 @@ public class CompressionParameters Method method = compressorClass.getMethod("create", Map.class); ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions); // Check for unknown options - Set<String> supportedOpts = compressor.supportedOptions(); + AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS); for (String provided : compressionOptions.keySet()) if (!supportedOpts.contains(provided)) throw new ConfigurationException("Unknown compression options " + provided); @@ -203,8 +248,7 @@ public class CompressionParameters } } - if (crcChance > 1.0d || crcChance < 0.0d) - throw new ConfigurationException("crc_check_chance should be between 0.0 to 1.0"); + validateCrcCheckChance(crcCheckChance); } public Map<String, String> asThriftOptions()