Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 aef71696e -> e25453ba5 refs/heads/trunk be8b9f299 -> fe15e6c3c
Moved crc_check_chance out of compression options Patch by Paulo Motta; reviewed by Sam Tunnicliffe for CASSANDRA-9839 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e25453ba Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e25453ba Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e25453ba Branch: refs/heads/cassandra-3.0 Commit: e25453ba55d35cac802719c6fb906460f13a9c36 Parents: aef7169 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Wed Sep 2 19:24:47 2015 -0300 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Sat Sep 19 11:38:55 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + bin/cqlsh.py | 2 +- ...ore-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar | Bin 2217119 -> 0 bytes ...ore-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar | Bin 0 -> 2282233 bytes ...iver-internal-only-3.0.0a2.post0-03085e6.zip | Bin 230830 -> 0 bytes ...iver-internal-only-3.0.0a2.post0-379b6f2.zip | Bin 0 -> 230837 bytes .../org/apache/cassandra/config/CFMetaData.java | 9 +- .../cql3/statements/TableAttributes.java | 26 ++++++ .../apache/cassandra/db/ColumnFamilyStore.java | 61 ++++++++------ .../compress/CompressedRandomAccessReader.java | 10 ++- .../cassandra/io/compress/LZ4Compressor.java | 2 +- .../io/sstable/format/SSTableReader.java | 17 ++-- .../cassandra/io/util/IChecksummedFile.java | 27 ++++++ .../cassandra/io/util/ICompressedFile.java | 3 +- .../apache/cassandra/io/util/SegmentedFile.java | 15 +++- .../cassandra/schema/CompressionParams.java | 78 +++++------------ .../cassandra/schema/LegacySchemaMigrator.java | 8 +- .../apache/cassandra/schema/SchemaKeyspace.java | 4 + .../apache/cassandra/schema/TableParams.java | 24 +++++- .../compress/CompressedInputStream.java | 7 +- .../compress/CompressedStreamReader.java | 3 +- .../apache/cassandra/utils/DefaultInteger.java | 51 ----------- .../apache/cassandra/utils/DefaultValue.java | 51 +++++++++++ .../miscellaneous/CrcCheckChanceTest.java | 84 ++++++++++++------- .../compression/CompressedInputStreamTest.java | 3 +- 26 files changed, 308 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2c0cde2..e55fd0a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-rc1 + * Move crc_check_chance out of compression options (CASSANDRA-9839) * Fix descending iteration past end of BTreeSearchIterator (CASSANDRA-10301) * Transfer hints to a different node on decommission (CASSANDRA-10198) * Check partition keys for CAS operations during stmt validation (CASSANDRA-10338) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 94f3c37..924c35f 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -94,6 +94,8 @@ Upgrading - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated. The new options are `class` and `chunk_length_in_kb`. Disabling compression should now be done by setting the new option `enabled` to `false`. + - The compression option `crc_check_chance` became a top-level table option, but is currently + enforced only against tables with enabled compression. - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax has been deprecated since 2.1.0 and is being removed in 3.0.0. - The 'index_interval' option for 'CREATE TABLE' statements, which has been deprecated http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/bin/cqlsh.py ---------------------------------------------------------------------- diff --git a/bin/cqlsh.py b/bin/cqlsh.py index 41fa4fd..07968d0 100644 --- a/bin/cqlsh.py +++ b/bin/cqlsh.py @@ -1184,7 +1184,7 @@ class Shell(cmd.Cmd): return self.get_table_meta(ks, name) except ColumnFamilyNotFound: try: - return self.get_view_meta(ks, name) + return self.get_view_meta(ks, name) except MaterializedViewNotFound: raise ObjectNotFound("%r not found in keyspace %r" % (name, ks)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar deleted file mode 100644 index 0f01a28..0000000 Binary files a/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar new file mode 100644 index 0000000..cf30f25 Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip deleted file mode 100644 index e672bd9..0000000 Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip new file mode 100644 index 0000000..5605ef7 Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 69bf6bf..00ca704 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -199,6 +199,12 @@ public final class CFMetaData return this; } + public CFMetaData crcCheckChance(double prop) + { + params = TableParams.builder(params).crcCheckChance(prop).build(); + return this; + } + public CFMetaData speculativeRetry(SpeculativeRetryParam prop) { params = TableParams.builder(params).speculativeRetry(prop).build(); @@ -270,7 +276,8 @@ public final class CFMetaData isIndex = cfName.contains("."); - assert partitioner != null; + assert partitioner != null : "This assertion failure is probably due to accessing Schema.instance " + + "from client-mode tools - See CASSANDRA-8143."; this.partitioner = partitioner; // A compact table should always have a clustering http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java index ed64f0d..9e7bbfe 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java +++ b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java @@ -73,7 +73,17 @@ public final class TableAttributes extends PropertyDefinitions builder.compaction(CompactionParams.fromMap(getMap(Option.COMPACTION))); if (hasOption(Option.COMPRESSION)) + { + //crc_check_chance was "promoted" from a compression property to a top-level-property after #9839 + //so we temporarily accept it to be defined as a compression option, to maintain backwards compatibility + Map<String, String> compressionOpts = getMap(Option.COMPRESSION); + if (compressionOpts.containsKey(Option.CRC_CHECK_CHANCE.toString().toLowerCase())) + { + Double crcCheckChance = getDeprecatedCrcCheckChance(compressionOpts); + builder.crcCheckChance(crcCheckChance); + } builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION))); + } if (hasOption(Option.DCLOCAL_READ_REPAIR_CHANCE)) builder.dcLocalReadRepairChance(getDouble(Option.DCLOCAL_READ_REPAIR_CHANCE)); @@ -99,9 +109,25 @@ public final class TableAttributes extends PropertyDefinitions if (hasOption(Option.SPECULATIVE_RETRY)) builder.speculativeRetry(SpeculativeRetryParam.fromString(getString(Option.SPECULATIVE_RETRY))); + if (hasOption(Option.CRC_CHECK_CHANCE)) + builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE)); + return builder.build(); } + private Double getDeprecatedCrcCheckChance(Map<String, String> compressionOpts) + { + String value = compressionOpts.get(Option.CRC_CHECK_CHANCE.toString().toLowerCase()); + try + { + return Double.parseDouble(value); + } + catch (NumberFormatException e) + { + throw new SyntaxException(String.format("Invalid double value %s for crc_check_chance.'", value)); + } + } + private double getDouble(Option option) { String value = getString(option); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index cdb9770..a9a8f80 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -196,8 +196,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final ViewManager.ForStore viewManager; /* These are locally held copies to be changed from the config during runtime */ - private volatile DefaultInteger minCompactionThreshold; - private volatile DefaultInteger maxCompactionThreshold; + private volatile DefaultValue<Integer> minCompactionThreshold; + private volatile DefaultValue<Integer> maxCompactionThreshold; + private volatile DefaultValue<Double> crcCheckChance; + private final CompactionStrategyManager compactionStrategyManager; private volatile Directories directories; @@ -219,10 +221,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // only update these runtime-modifiable settings if they have not been modified. if (!minCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) - cfs.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold()); + cfs.minCompactionThreshold = new DefaultValue(metadata.params.compaction.minCompactionThreshold()); if (!maxCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) - cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold()); + cfs.maxCompactionThreshold = new DefaultValue(metadata.params.compaction.maxCompactionThreshold()); + if (!crcCheckChance.isModified()) + for (ColumnFamilyStore cfs : concatWithIndexes()) + cfs.crcCheckChance = new DefaultValue(metadata.params.crcCheckChance); compactionStrategyManager.maybeReload(metadata); directories = compactionStrategyManager.getDirectories(); @@ -333,19 +338,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - // FIXME: this is wrong, JMX should never update live CFMetaData objects - public void setCrcCheckChance(double crcCheckChance) - { - try - { - metadata.params.compression.setCrcCheckChance(crcCheckChance); - } - catch (ConfigurationException e) - { - throw new IllegalArgumentException(e.getMessage()); - } - } - private ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, int generation, @@ -370,14 +362,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName; this.keyspace = keyspace; - name = columnFamilyName; this.metadata = metadata; - this.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold()); - this.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold()); this.directories = directories; - this.indexManager = new SecondaryIndexManager(this); - this.viewManager = keyspace.viewManager.forTable(metadata.cfId); - this.metric = new TableMetrics(this); + name = columnFamilyName; + minCompactionThreshold = new DefaultValue<>(metadata.params.compaction.minCompactionThreshold()); + maxCompactionThreshold = new DefaultValue<>(metadata.params.compaction.maxCompactionThreshold()); + crcCheckChance = new DefaultValue<>(metadata.params.crcCheckChance); + indexManager = new SecondaryIndexManager(this); + viewManager = keyspace.viewManager.forTable(metadata.cfId); + metric = new TableMetrics(this); fileIndexGenerator.set(generation); sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2; @@ -394,7 +387,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // compaction strategy should be created after the CFS has been prepared - this.compactionStrategyManager = new CompactionStrategyManager(this); + compactionStrategyManager = new CompactionStrategyManager(this); this.directories = this.compactionStrategyManager.getDirectories(); if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0) @@ -2041,6 +2034,26 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return compactionStrategyManager; } + public void setCrcCheckChance(double crcCheckChance) + { + try + { + TableParams.builder().crcCheckChance(crcCheckChance).build().validate(); + for (ColumnFamilyStore cfs : concatWithIndexes()) + cfs.crcCheckChance.set(crcCheckChance); + } + catch (ConfigurationException e) + { + throw new IllegalArgumentException(e.getMessage()); + } + } + + + public Double getCrcCheckChance() + { + return crcCheckChance.value(); + } + public void setCompactionThresholds(int minThreshold, int maxThreshold) { validateCompactionThresholds(minThreshold, maxThreshold); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index 7294923..b2759e6 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -21,6 +21,7 @@ import java.io.*; import java.nio.ByteBuffer; import java.util.concurrent.ThreadLocalRandom; import java.util.zip.Checksum; +import java.util.function.Supplier; import com.google.common.primitives.Ints; @@ -45,12 +46,14 @@ public class CompressedRandomAccessReader extends RandomAccessReader // raw checksum bytes private ByteBuffer checksumBytes; + private final Supplier<Double> crcCheckChanceSupplier; protected CompressedRandomAccessReader(Builder builder) { super(builder); this.metadata = builder.metadata; this.checksum = metadata.checksumType.newInstance(); + crcCheckChanceSupplier = builder.crcCheckChanceSupplier; if (regions == null) { @@ -121,7 +124,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader buffer.flip(); } - if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) { compressed.rewind(); metadata.checksumType.update( checksum, (compressed)); @@ -183,7 +186,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader buffer.flip(); } - if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) { compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); @@ -236,18 +239,21 @@ public class CompressedRandomAccessReader extends RandomAccessReader public final static class Builder extends RandomAccessReader.Builder { private final CompressionMetadata metadata; + private final Supplier<Double> crcCheckChanceSupplier; public Builder(ICompressedFile file) { super(file.channel()); this.metadata = applyMetadata(file.getMetadata()); this.regions = file.regions(); + this.crcCheckChanceSupplier = file.getCrcCheckChanceSupplier(); } public Builder(ChannelProxy channel, CompressionMetadata metadata) { super(channel); this.metadata = applyMetadata(metadata); + this.crcCheckChanceSupplier = (() -> 1.0); //100% crc_check_chance } private CompressionMetadata applyMetadata(CompressionMetadata metadata) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java index 069cc96..3a3b024 100644 --- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java +++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java @@ -127,7 +127,7 @@ public class LZ4Compressor implements ICompressor public Set<String> supportedOptions() { - return new HashSet<>(Arrays.asList(CompressionParams.CRC_CHECK_CHANCE)); + return new HashSet<>(); } public BufferType preferredBufferType() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index ebf28a4..c4ef239 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -655,12 +655,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache // here when we know we're being wired into the rest of the server infrastructure. keyCache = CacheService.instance.keyCache; - - // ensure secondary index compression metadata is linked to the parent metadata. - if (compression && metadata.isIndex()) + final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); + if (cfs != null) { - getCompressionMetadata().parameters.setLiveMetadata( - Schema.instance.getCFMetaData(metadata.ksName, metadata.getParentColumnFamilyName())); + ifile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance); + dfile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance); } } @@ -1642,6 +1641,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return dfile.onDiskLength; } + @VisibleForTesting + public double getCrcCheckChance() + { + return dfile.getCrcCheckChanceSupplier().get(); + } + /** * Mark the sstable as obsolete, i.e., compacted into newer sstables. * @@ -2047,8 +2052,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { tidy.setup(this, trackHotness); this.readMeter = tidy.global.readMeter; - if (compression) - getCompressionMetadata().parameters.setLiveMetadata(metadata); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/IChecksummedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/IChecksummedFile.java b/src/java/org/apache/cassandra/io/util/IChecksummedFile.java new file mode 100644 index 0000000..fa15a5e --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/IChecksummedFile.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.util.function.Supplier; + +public interface IChecksummedFile +{ + public Supplier<Double> getCrcCheckChanceSupplier(); + public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/ICompressedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java index 43d37dc..c149fd1 100644 --- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java +++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java @@ -19,9 +19,10 @@ package org.apache.cassandra.io.util; import org.apache.cassandra.io.compress.CompressionMetadata; -public interface ICompressedFile +public interface ICompressedFile extends IChecksummedFile { ChannelProxy channel(); CompressionMetadata getMetadata(); MmappedRegions regions(); + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index c2a2374..2504ecd 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; +import java.util.function.Supplier; import com.google.common.util.concurrent.RateLimiter; @@ -48,7 +49,7 @@ import static org.apache.cassandra.utils.Throwables.maybeFail; * would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for * each access to that segment. */ -public abstract class SegmentedFile extends SharedCloseableImpl +public abstract class SegmentedFile extends SharedCloseableImpl implements IChecksummedFile { public final ChannelProxy channel; public final int bufferSize; @@ -57,6 +58,8 @@ public abstract class SegmentedFile extends SharedCloseableImpl // This differs from length for compressed files (but we still need length for // SegmentIterator because offsets in the file are relative to the uncompressed size) public final long onDiskLength; + private Supplier<Double> crcCheckChanceSupplier = () -> 1.0; + /** * Use getBuilder to get a Builder to construct a SegmentedFile. @@ -134,6 +137,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl return reader; } + public Supplier<Double> getCrcCheckChanceSupplier() + { + return crcCheckChanceSupplier; + } + + public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier) + { + this.crcCheckChanceSupplier = crcCheckChanceSupplier; + } + public void dropPageCache(long before) { CLibrary.trySkipCache(channel.getFileDescriptor(), 0, before, path()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/CompressionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index a73fcd1..443b6f1 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.*; +import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -38,6 +39,8 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.utils.NoSpamLogger; import static java.lang.String.format; @@ -48,9 +51,9 @@ public final class CompressionParams private static volatile boolean hasLoggedSsTableCompressionWarning; private static volatile boolean hasLoggedChunkLengthWarning; + private static volatile boolean hasLoggedCrcCheckChanceWarning; public static final int DEFAULT_CHUNK_LENGTH = 65536; - public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0; public static final IVersionedSerializer<CompressionParams> serializer = new Serializer(); public static final String CLASS = "class"; @@ -61,18 +64,16 @@ public final class CompressionParams DEFAULT_CHUNK_LENGTH, Collections.emptyMap()); + private static final String CRC_CHECK_CHANCE_WARNING = "The option crc_check_chance was deprecated as a compression option. " + + "You should specify it as a top-level table option instead"; + @Deprecated public static final String SSTABLE_COMPRESSION = "sstable_compression"; @Deprecated public static final String CHUNK_LENGTH_KB = "chunk_length_kb"; - - public static final String CRC_CHECK_CHANCE = "crc_check_chance"; - - public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE); + @Deprecated public static final String CRC_CHECK_CHANCE = "crc_check_chance"; private final ICompressor sstableCompressor; private final Integer chunkLength; - private volatile double crcCheckChance; - private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be use by the compressor - private CFMetaData liveMetadata; + private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be used by the compressor public static CompressionParams fromMap(Map<String, String> opts) { @@ -153,8 +154,6 @@ public final class CompressionParams this.sstableCompressor = sstableCompressor; this.chunkLength = chunkLength; this.otherOptions = ImmutableMap.copyOf(otherOptions); - String chance = otherOptions.get(CRC_CHECK_CHANCE); - this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance); } public CompressionParams copy() @@ -162,23 +161,6 @@ public final class CompressionParams return new CompressionParams(sstableCompressor, chunkLength, otherOptions); } - public void setLiveMetadata(final CFMetaData liveMetadata) - { - if (liveMetadata == null) - return; - - this.liveMetadata = liveMetadata; - } - - public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException - { - validateCrcCheckChance(crcCheckChance); - this.crcCheckChance = crcCheckChance; - - if (liveMetadata != null && this != liveMetadata.params.compression) - liveMetadata.params.compression.setCrcCheckChance(crcCheckChance); - } - /** * Checks if compression is enabled. * @return {@code true} if compression is enabled, {@code false} otherwise. @@ -202,31 +184,6 @@ public final class CompressionParams return otherOptions; } - public double getCrcCheckChance() - { - return liveMetadata == null ? this.crcCheckChance : liveMetadata.params.compression.crcCheckChance; - } - - private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException - { - try - { - double chance = Double.parseDouble(crcCheckChance); - validateCrcCheckChance(chance); - return chance; - } - catch (NumberFormatException e) - { - throw new ConfigurationException("crc_check_chance should be a double"); - } - } - - private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException - { - if (crcCheckChance < 0.0d || crcCheckChance > 1.0d) - throw new ConfigurationException("crc_check_chance should be between 0.0 and 1.0"); - } - public int chunkLength() { return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength; @@ -257,14 +214,23 @@ public final class CompressionParams return null; } + if (compressionOptions.containsKey(CRC_CHECK_CHANCE)) + { + if (!hasLoggedCrcCheckChanceWarning) + { + logger.warn(CRC_CHECK_CHANCE_WARNING); + hasLoggedCrcCheckChanceWarning = true; + } + compressionOptions.remove(CRC_CHECK_CHANCE); + } + try { Method method = compressorClass.getMethod("create", Map.class); ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions); // Check for unknown options - AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS); for (String provided : compressionOptions.keySet()) - if (!supportedOpts.contains(provided)) + if (!compressor.supportedOptions().contains(provided)) throw new ConfigurationException("Unknown compression options " + provided); return compressor; } @@ -363,7 +329,7 @@ public final class CompressionParams if (options.containsKey(CHUNK_LENGTH_KB)) { - if (options.containsKey(CHUNK_LENGTH_KB) && !hasLoggedChunkLengthWarning) + if (!hasLoggedChunkLengthWarning) { hasLoggedChunkLengthWarning = true; logger.warn(format("The %s option has been deprecated. You should use %s instead", @@ -474,8 +440,6 @@ public final class CompressionParams c >>= 1; } } - - validateCrcCheckChance(crcCheckChance); } public Map<String, String> asMap() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java index 0d5a040..f23ec0b 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java @@ -361,7 +361,13 @@ public final class LegacySchemaMigrator if (row.has("speculative_retry")) params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))); - params.compression(CompressionParams.fromMap(fromJsonMap(row.getString("compression_parameters")))); + Map<String, String> compressionParameters = fromJsonMap(row.getString("compression_parameters")); + String crcCheckChance = compressionParameters.remove("crc_check_chance"); + //crc_check_chance was promoted from a compression property to a top-level property + if (crcCheckChance != null) + params.crcCheckChance(Double.parseDouble(crcCheckChance)); + + params.compression(CompressionParams.fromMap(compressionParameters)); params.compaction(compactionFromRow(row)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 2376fad..fb97ca5 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -98,6 +98,7 @@ public final class SchemaKeyspace + "comment text," + "compaction frozen<map<text, text>>," + "compression frozen<map<text, text>>," + + "crc_check_chance double," + "dclocal_read_repair_chance double," + "default_time_to_live int," + "extensions frozen<map<text, blob>>," @@ -159,6 +160,7 @@ public final class SchemaKeyspace + "comment text," + "compaction frozen<map<text, text>>," + "compression frozen<map<text, text>>," + + "crc_check_chance double," + "dclocal_read_repair_chance double," + "default_time_to_live int," + "extensions frozen<map<text, blob>>," @@ -908,6 +910,7 @@ public final class SchemaKeyspace .add("min_index_interval", params.minIndexInterval) .add("read_repair_chance", params.readRepairChance) .add("speculative_retry", params.speculativeRetry.toString()) + .add("crc_check_chance", params.crcCheckChance) .frozenMap("caching", params.caching.asMap()) .frozenMap("compaction", params.compaction.asMap()) .frozenMap("compression", params.compression.asMap()) @@ -1149,6 +1152,7 @@ public final class SchemaKeyspace .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")) .minIndexInterval(row.getInt("min_index_interval")) .readRepairChance(row.getDouble("read_repair_chance")) + .crcCheckChance(row.getDouble("crc_check_chance")) .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))); if (row.has("extensions")) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/TableParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java index 64e2c36..7e44e73 100644 --- a/src/java/org/apache/cassandra/schema/TableParams.java +++ b/src/java/org/apache/cassandra/schema/TableParams.java @@ -46,7 +46,8 @@ public final class TableParams MEMTABLE_FLUSH_PERIOD_IN_MS, MIN_INDEX_INTERVAL, READ_REPAIR_CHANCE, - SPECULATIVE_RETRY; + SPECULATIVE_RETRY, + CRC_CHECK_CHANCE; @Override public String toString() @@ -63,11 +64,13 @@ public final class TableParams public static final int DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS = 0; public static final int DEFAULT_MIN_INDEX_INTERVAL = 128; public static final int DEFAULT_MAX_INDEX_INTERVAL = 2048; + public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0; public final String comment; public final double readRepairChance; public final double dcLocalReadRepairChance; public final double bloomFilterFpChance; + public final double crcCheckChance; public final int gcGraceSeconds; public final int defaultTimeToLive; public final int memtableFlushPeriodInMs; @@ -87,6 +90,7 @@ public final class TableParams bloomFilterFpChance = builder.bloomFilterFpChance == null ? builder.compaction.defaultBloomFilterFbChance() : builder.bloomFilterFpChance; + crcCheckChance = builder.crcCheckChance; gcGraceSeconds = builder.gcGraceSeconds; defaultTimeToLive = builder.defaultTimeToLive; memtableFlushPeriodInMs = builder.memtableFlushPeriodInMs; @@ -112,6 +116,7 @@ public final class TableParams .compaction(params.compaction) .compression(params.compression) .dcLocalReadRepairChance(params.dcLocalReadRepairChance) + .crcCheckChance(params.crcCheckChance) .defaultTimeToLive(params.defaultTimeToLive) .gcGraceSeconds(params.gcGraceSeconds) .maxIndexInterval(params.maxIndexInterval) @@ -148,6 +153,13 @@ public final class TableParams readRepairChance); } + if (crcCheckChance < 0 || crcCheckChance > 1.0) + { + fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)", + Option.CRC_CHECK_CHANCE, + crcCheckChance); + } + if (defaultTimeToLive < 0) fail("%s must be greater than or equal to 0 (got %s)", Option.DEFAULT_TIME_TO_LIVE, defaultTimeToLive); @@ -190,6 +202,7 @@ public final class TableParams && readRepairChance == p.readRepairChance && dcLocalReadRepairChance == p.dcLocalReadRepairChance && bloomFilterFpChance == p.bloomFilterFpChance + && crcCheckChance == p.crcCheckChance && gcGraceSeconds == p.gcGraceSeconds && defaultTimeToLive == p.defaultTimeToLive && memtableFlushPeriodInMs == p.memtableFlushPeriodInMs @@ -209,6 +222,7 @@ public final class TableParams readRepairChance, dcLocalReadRepairChance, bloomFilterFpChance, + crcCheckChance, gcGraceSeconds, defaultTimeToLive, memtableFlushPeriodInMs, @@ -229,6 +243,7 @@ public final class TableParams .add(Option.READ_REPAIR_CHANCE.toString(), readRepairChance) .add(Option.DCLOCAL_READ_REPAIR_CHANCE.toString(), dcLocalReadRepairChance) .add(Option.BLOOM_FILTER_FP_CHANCE.toString(), bloomFilterFpChance) + .add(Option.CRC_CHECK_CHANCE.toString(), crcCheckChance) .add(Option.GC_GRACE_SECONDS.toString(), gcGraceSeconds) .add(Option.DEFAULT_TIME_TO_LIVE.toString(), defaultTimeToLive) .add(Option.MEMTABLE_FLUSH_PERIOD_IN_MS.toString(), memtableFlushPeriodInMs) @@ -248,6 +263,7 @@ public final class TableParams private double readRepairChance = DEFAULT_READ_REPAIR_CHANCE; private double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE; private Double bloomFilterFpChance; + public Double crcCheckChance = DEFAULT_CRC_CHECK_CHANCE; private int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS; private int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE; private int memtableFlushPeriodInMs = DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS; @@ -292,6 +308,12 @@ public final class TableParams return this; } + public Builder crcCheckChance(double val) + { + crcCheckChance = val; + return this; + } + public Builder gcGraceSeconds(int val) { gcGraceSeconds = val; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 0a118b2..ccd0ac5 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; import java.util.zip.Checksum; import com.google.common.collect.Iterators; @@ -41,6 +42,7 @@ public class CompressedInputStream extends InputStream private final CompressionInfo info; // chunk buffer private final BlockingQueue<byte[]> dataBuffer; + private final Supplier<Double> crcCheckChanceSupplier; // uncompressed bytes private byte[] buffer; @@ -65,13 +67,14 @@ public class CompressedInputStream extends InputStream * @param source Input source to read compressed data from * @param info Compression info */ - public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType) + public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier) { this.info = info; this.checksum = checksumType.newInstance(); this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024)); + this.crcCheckChanceSupplier = crcCheckChanceSupplier; new Thread(new Reader(source, info, dataBuffer)).start(); } @@ -111,7 +114,7 @@ public class CompressedInputStream extends InputStream totalCompressedBytesRead += compressed.length; // validate crc randomly - if (info.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + if (this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) { checksum.update(compressed, 0, compressed.length - checksumBytes.length); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index f702e24..69c7b87 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -77,7 +77,8 @@ public class CompressedStreamReader extends StreamReader SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format); - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType()); + CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, + inputVersion.compressedChecksumType(), cfs::getCrcCheckChance); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata)); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/utils/DefaultInteger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/DefaultInteger.java b/src/java/org/apache/cassandra/utils/DefaultInteger.java deleted file mode 100644 index 2a3efc7..0000000 --- a/src/java/org/apache/cassandra/utils/DefaultInteger.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils; - - -public class DefaultInteger -{ - private final int originalValue; - private int currentValue; - - public DefaultInteger(int value) - { - originalValue = value; - currentValue = value; - } - - public int value() - { - return currentValue; - } - - public void set(int i) - { - currentValue = i; - } - - public void reset() - { - currentValue = originalValue; - } - - public boolean isModified() - { - return originalValue != currentValue; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/utils/DefaultValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/DefaultValue.java b/src/java/org/apache/cassandra/utils/DefaultValue.java new file mode 100644 index 0000000..5697ede --- /dev/null +++ b/src/java/org/apache/cassandra/utils/DefaultValue.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + + +public class DefaultValue<T> +{ + private final T originalValue; + private T currentValue; + + public DefaultValue(T value) + { + originalValue = value; + currentValue = value; + } + + public T value() + { + return currentValue; + } + + public void set(T i) + { + currentValue = i; + } + + public void reset() + { + currentValue = originalValue; + } + + public boolean isModified() + { + return originalValue != currentValue; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java index e0879d2..3a68e4a 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java @@ -35,11 +35,28 @@ import org.apache.cassandra.utils.FBUtilities; public class CrcCheckChanceTest extends CQLTester { + + @Test - public void testChangingCrcCheckChance() throws Throwable + public void testChangingCrcCheckChanceNewFormat() throws Throwable + { + testChangingCrcCheckChance(true); + } + + @Test + public void testChangingCrcCheckChanceOldFormat() throws Throwable + { + testChangingCrcCheckChance(false); + } + + + public void testChangingCrcCheckChance(boolean newFormat) throws Throwable { //Start with crc_check_chance of 99% - createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}"); + if (newFormat) + createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor'} AND crc_check_chance = 0.99;"); + else + createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}"); execute("CREATE INDEX foo ON %s(v)"); @@ -47,37 +64,37 @@ public class CrcCheckChanceTest extends CQLTester execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2"); execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2"); - ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable()); ColumnFamilyStore indexCfs = cfs.indexManager.getAllIndexColumnFamilyStores().iterator().next(); cfs.forceBlockingFlush(); - Assert.assertEquals(0.99, cfs.metadata.params.compression.getCrcCheckChance()); - Assert.assertEquals(0.99, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance()); - Assert.assertEquals(0.99, indexCfs.metadata.params.compression.getCrcCheckChance()); - Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance()); + Assert.assertEquals(0.99, cfs.getCrcCheckChance()); + Assert.assertEquals(0.99, cfs.getLiveSSTables().iterator().next().getCrcCheckChance()); + Assert.assertEquals(0.99, indexCfs.getCrcCheckChance()); + Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance()); //Test for stack overflow - cfs.setCrcCheckChance(0.99); + if (newFormat) + alterTable("ALTER TABLE %s WITH crc_check_chance = 0.99"); + else + alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.99}"); assertRows(execute("SELECT * FROM %s WHERE p=?", "p1"), - row("p1", "k1", "sv1", "v1"), - row("p1", "k2", "sv1", "v2") + row("p1", "k1", "sv1", "v1"), + row("p1", "k2", "sv1", "v2") ); assertRows(execute("SELECT * FROM %s WHERE v=?", "v1"), - row("p1", "k1", "sv1", "v1") + row("p1", "k1", "sv1", "v1") ); //Write a few SSTables then Compact - execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", "k1", "v1", "sv1"); execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2"); execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2"); cfs.forceBlockingFlush(); - execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", "k1", "v1", "sv1"); execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2"); execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2"); @@ -89,34 +106,45 @@ public class CrcCheckChanceTest extends CQLTester execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2"); cfs.forceBlockingFlush(); - cfs.forceMajorCompaction(); - //Verify when we alter the value the live sstable readers hold the new one - alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.01}"); + //Now let's change via JMX + cfs.setCrcCheckChance(0.01); - Assert.assertEquals( 0.01, cfs.metadata.params.compression.getCrcCheckChance()); - Assert.assertEquals( 0.01, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance()); - Assert.assertEquals( 0.01, indexCfs.metadata.params.compression.getCrcCheckChance()); - Assert.assertEquals( 0.01, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance()); + Assert.assertEquals(0.01, cfs.getCrcCheckChance()); + Assert.assertEquals(0.01, cfs.getLiveSSTables().iterator().next().getCrcCheckChance()); + Assert.assertEquals(0.01, indexCfs.getCrcCheckChance()); + Assert.assertEquals(0.01, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance()); assertRows(execute("SELECT * FROM %s WHERE p=?", "p1"), - row("p1", "k1", "sv1", "v1"), - row("p1", "k2", "sv1", "v2") + row("p1", "k1", "sv1", "v1"), + row("p1", "k2", "sv1", "v2") ); assertRows(execute("SELECT * FROM %s WHERE v=?", "v1"), - row("p1", "k1", "sv1", "v1") + row("p1", "k1", "sv1", "v1") ); + //Alter again via schema + if (newFormat) + alterTable("ALTER TABLE %s WITH crc_check_chance = 0.5"); + else + alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.5}"); + + //We should be able to get the new value by accessing directly the schema metadata + Assert.assertEquals(0.5, cfs.metadata.params.crcCheckChance); + + //but previous JMX-set value will persist until next restart + Assert.assertEquals(0.01, cfs.getLiveSSTables().iterator().next().getCrcCheckChance()); + Assert.assertEquals(0.01, indexCfs.getCrcCheckChance()); + Assert.assertEquals(0.01, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance()); //Verify the call used by JMX still works cfs.setCrcCheckChance(0.03); - Assert.assertEquals( 0.03, cfs.metadata.params.compression.getCrcCheckChance()); - Assert.assertEquals( 0.03, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance()); - Assert.assertEquals( 0.03, indexCfs.metadata.params.compression.getCrcCheckChance()); - Assert.assertEquals( 0.03, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance()); - + Assert.assertEquals(0.03, cfs.getCrcCheckChance()); + Assert.assertEquals(0.03, cfs.getLiveSSTables().iterator().next().getCrcCheckChance()); + Assert.assertEquals(0.03, indexCfs.getCrcCheckChance()); + Assert.assertEquals(0.03, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index 30670fb..db05a3e 100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@ -108,7 +108,8 @@ public class CompressedInputStreamTest // read buffer using CompressedInputStream CompressionInfo info = new CompressionInfo(chunks, param); - CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, ChecksumType.CRC32); + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, + ChecksumType.CRC32, () -> 1.0); DataInputStream in = new DataInputStream(input); for (int i = 0; i < sections.size(); i++)