Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/413e48e6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/413e48e6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/413e48e6 Branch: refs/heads/cassandra-3.0 Commit: 413e48e6571e3c23362d5053e0c7fcdd99bd1e7d Parents: 5cebd1f c70ce63 Author: Yuki Morishita <yu...@apache.org> Authored: Thu Sep 29 14:32:49 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Sep 29 14:32:49 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../db/compaction/CompactionManager.java | 6 +- .../org/apache/cassandra/repair/Validator.java | 9 +- .../org/apache/cassandra/utils/MerkleTree.java | 10 ++ .../org/apache/cassandra/utils/MerkleTrees.java | 10 ++ .../db/compaction/CompactionsTest.java | 2 +- .../apache/cassandra/repair/ValidatorTest.java | 167 ++++++++++++------- 7 files changed, 146 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index f2f8dac,97bc70a..9076e7a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,77 -1,13 +1,78 @@@ -2.2.9 +3.0.10 + * Fix failure in LogTransactionTest (CASSANDRA-12632) + * Fix potentially incomplete non-frozen UDT values when querying with the + full primary key specified (CASSANDRA-12605) + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670) + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060) + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516) + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472) + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499) + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681) +Merged from 2.2: + * Fix merkle tree depth calculation (CASSANDRA-12580) + * Make Collections deserialization more robust (CASSANDRA-12618) - - -2.2.8 * Fix exceptions when enabling gossip on nodes that haven't joined the ring (CASSANDRA-12253) * Fix authentication problem when invoking clqsh copy from a SOURCE command (CASSANDRA-12642) * Decrement pending range calculator jobs counter in finally block (CASSANDRA-12554) +Merged from 2.1: - * Make Collections deserialization more robust (CASSANDRA-12618) + * Add system property to set the max number of native transport requests in queue (CASSANDRA-11363) + + +3.0.9 + * Handle composite prefixes with final EOC=0 as in 2.x and refactor LegacyLayout.decodeBound (CASSANDRA-12423) + * Fix paging for 2.x to 3.x upgrades (CASSANDRA-11195) + * select_distinct_with_deletions_test failing on non-vnode environments (CASSANDRA-11126) + * Stack Overflow returned to queries while upgrading (CASSANDRA-12527) + * Fix legacy regex for temporary files from 2.2 (CASSANDRA-12565) + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208) + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889) + * Fix file handle leaks due to simultaneous compaction/repair and + listing snapshots, calculating snapshot sizes, or making schema + changes (CASSANDRA-11594) + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508) + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504) + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385) + * Calculate last compacted key on startup (CASSANDRA-6216) + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190) + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436) + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331) + * Backport CASSANDRA-12002 (CASSANDRA-12177) + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100) + * Fix potential bad messaging service message for paged range reads + within mixed-version 3.x clusters (CASSANDRA-12249) + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828) + * NullPointerException during compaction on table with static columns (CASSANDRA-12336) + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823) + * Fix upgrade of super columns on thrift (CASSANDRA-12335) + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359) + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277) + * Exception when computing read-repair for range tombstones (CASSANDRA-12263) + * Lost counter writes in compact table and static columns (CASSANDRA-12219) + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980) + * Add option to override compaction space check (CASSANDRA-12180) + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114) + * Respond with v1/v2 protocol header when responding to driver that attempts + to connect with too low of a protocol version (CASSANDRA-11464) + * NullPointerExpception when reading/compacting table (CASSANDRA-11988) + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144) + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107) + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315) + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733) + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098) + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944) + * Fix column ordering of results with static columns for Thrift requests in + a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of + those static columns in query results (CASSANDRA-12123) + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090) + * Fix EOF exception when altering column type (CASSANDRA-11820) + * Fix JsonTransformer output of partition with deletion info (CASSANDRA-12418) + * Fix NPE in SSTableLoader when specifying partial directory path (CASSANDRA-12609) +Merged from 2.2: * Add local address entry in PropertyFileSnitch (CASSANDRA-11332) * cqlshlib tests: increase default execute timeout (CASSANDRA-12481) * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523) http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 99e0fd5,78fa23c..4d1757e --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1094,34 -1102,40 +1094,33 @@@ public class CompactionManager implemen if (validator.gcBefore > 0) gcBefore = validator.gcBefore; else - gcBefore = getDefaultGcBefore(cfs); + gcBefore = getDefaultGcBefore(cfs, nowInSec); } - // Create Merkle tree suitable to hold estimated partitions for given range. - // We blindly assume that partition is evenly distributed on all sstables for now. - long numPartitions = 0; - for (SSTableReader sstable : sstables) - { - numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range)); - } - // determine tree depth from number of partitions, but cap at 20 to prevent large tree (CASSANDRA-5263) - int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), 20) : 0; - MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); - + // Create Merkle trees suitable to hold estimated partitions for the given ranges. + // We blindly assume that a partition is evenly distributed on all sstables for now. - // determine tree depth from number of partitions, but cap at 20 to prevent large tree. + MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs); long start = System.nanoTime(); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range)) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); + ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore); + CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics)) { - CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - metrics.beginCompaction(ci); - try + // validate the CF as we iterate over it + validator.prepare(cfs, tree); + while (ci.hasNext()) { - // validate the CF as we iterate over it - validator.prepare(cfs, tree); - while (iter.hasNext()) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + try (UnfilteredRowIterator partition = ci.next()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - AbstractCompactedRow row = iter.next(); - validator.add(row); + validator.add(partition); } - validator.complete(); } - finally + validator.complete(); + } + finally + { + if (isSnapshotValidation && !isGlobalSnapshotValidation) { // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction // is done). @@@ -1144,37 -1167,6 +1143,40 @@@ } } + private static MerkleTrees createMerkleTrees(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, ColumnFamilyStore cfs) + { + MerkleTrees tree = new MerkleTrees(cfs.getPartitioner()); + long allPartitions = 0; + Map<Range<Token>, Long> rangePartitionCounts = new HashMap<>(); + for (Range<Token> range : ranges) + { + long numPartitions = 0; + for (SSTableReader sstable : sstables) + numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(range)); + rangePartitionCounts.put(range, numPartitions); + allPartitions += numPartitions; + } + + for (Range<Token> range : ranges) + { + long numPartitions = rangePartitionCounts.get(range); + double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0; ++ // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress, ++ // capping at 20 to prevent large tree (CASSANDRA-11390) + int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0; - int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), maxDepth) : 0; ++ // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263) ++ int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0; + tree.addMerkleTree((int) Math.pow(2, depth), range); + } + if (logger.isDebugEnabled()) + { + // MT serialize may take time + logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, MerkleTrees.serializer.serializedSize(tree, 0)); + } + + return tree; + } + private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator) { Refs<SSTableReader> sstables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/Validator.java index 217c9de,8dbb4cf..9baa358 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@@ -77,13 -79,14 +83,14 @@@ public class Validator implements Runna validated = 0; range = null; ranges = null; + this.evenTreeDistribution = evenTreeDistribution; } - public void prepare(ColumnFamilyStore cfs, MerkleTree tree) + public void prepare(ColumnFamilyStore cfs, MerkleTrees tree) { - this.tree = tree; + this.trees = tree; - if (!tree.partitioner().preservesOrder()) + if (!tree.partitioner().preservesOrder() || evenTreeDistribution) { // You can't beat an even tree distribution for md5 tree.init(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTree.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/src/java/org/apache/cassandra/utils/MerkleTrees.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/MerkleTrees.java index b950b3b,0000000..4ae55ab mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/utils/MerkleTrees.java +++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java @@@ -1,436 -1,0 +1,446 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.*; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.PeekingIterator; +import org.slf4j.Logger; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + + +/** + * Wrapper class for handling of multiple MerkleTrees at once. + * + * The MerkleTree's are divided in Ranges of non-overlapping tokens. + */ +public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>> +{ + public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer(); + + private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator()); + + private IPartitioner partitioner; + + /** + * Creates empty MerkleTrees object. + * + * @param partitioner The partitioner to use + */ + public MerkleTrees(IPartitioner partitioner) + { + this(partitioner, new ArrayList<>()); + } + + private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees) + { + this.partitioner = partitioner; + addTrees(merkleTrees); + } + + /** + * Get the ranges that these merkle trees covers. + * + * @return + */ + public Collection<Range<Token>> ranges() + { + return merkleTrees.keySet(); + } + + /** + * Get the partitioner in use. + * + * @return + */ + public IPartitioner partitioner() + { + return partitioner; + } + + /** + * Add merkle tree's with the defined maxsize and ranges. + * + * @param maxsize + * @param ranges + */ + public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges) + { + for (Range<Token> range : ranges) + { + addMerkleTree(maxsize, range); + } + } + + /** + * Add a MerkleTree with the defined size and range. + * + * @param maxsize + * @param range + * @return The created merkle tree. + */ + public MerkleTree addMerkleTree(int maxsize, Range<Token> range) + { + return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range); + } + + @VisibleForTesting + public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range) + { + MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize); + addTree(tree); + + return tree; + } + + /** + * Get the MerkleTree.Range responsible for the given token. + * + * @param t + * @return + */ + @VisibleForTesting + public MerkleTree.TreeRange get(Token t) + { + return getMerkleTree(t).get(t); + } + + /** + * Init all MerkleTree's with an even tree distribution. + */ + public void init() + { + for (Range<Token> range : merkleTrees.keySet()) + { + init(range); + } + } + + /** + * Init a selected MerkleTree with an even tree distribution. + * + * @param range + */ + public void init(Range<Token> range) + { + merkleTrees.get(range).init(); + } + + /** + * Split the MerkleTree responsible for the given token. + * + * @param t + * @return + */ + public boolean split(Token t) + { + return getMerkleTree(t).split(t); + } + + /** + * Invalidate the MerkleTree responsible for the given token. + * + * @param t + */ + @VisibleForTesting + public void invalidate(Token t) + { + getMerkleTree(t).invalidate(t); + } + + /** + * Get the MerkleTree responsible for the given token range. + * + * @param range + * @return + */ + public MerkleTree getMerkleTree(Range<Token> range) + { + return merkleTrees.get(range); + } + + public long size() + { + long size = 0; + + for (MerkleTree tree : merkleTrees.values()) + { + size += tree.size(); + } + + return size; + } + + @VisibleForTesting + public void maxsize(Range<Token> range, int maxsize) + { + getMerkleTree(range).maxsize(maxsize); + } + + /** + * Get the MerkleTree responsible for the given token. + * + * @param t + * @return The given MerkleTree or null if none exist. + */ + private MerkleTree getMerkleTree(Token t) + { + for (Range<Token> range : merkleTrees.keySet()) + { + if (range.contains(t)) + return merkleTrees.get(range); + } + + throw new AssertionError("Expected tree for token " + t); + } + + private void addTrees(Collection<MerkleTree> trees) + { + for (MerkleTree tree : trees) + { + addTree(tree); + } + } + + private void addTree(MerkleTree tree) + { + assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range"; + + merkleTrees.put(tree.fullRange, tree); + } + + private boolean validateNonOverlapping(MerkleTree tree) + { + for (Range<Token> range : merkleTrees.keySet()) + { + if (tree.fullRange.intersects(range)) + return false; + } + + return true; + } + + /** + * Get an iterator for all the invalids generated by the MerkleTrees. + * + * @return + */ + public TreeRangeIterator invalids() + { + return new TreeRangeIterator(); + } + + /** + * Log the row count per leaf for all MerkleTrees. + * + * @param logger + */ + public void logRowCountPerLeaf(Logger logger) + { + for (MerkleTree tree : merkleTrees.values()) + { + tree.histogramOfRowCountPerLeaf().log(logger); + } + } + + /** + * Log the row size per leaf for all MerkleTrees. + * + * @param logger + */ + public void logRowSizePerLeaf(Logger logger) + { + for (MerkleTree tree : merkleTrees.values()) + { + tree.histogramOfRowSizePerLeaf().log(logger); + } + } + + @VisibleForTesting + public byte[] hash(Range<Token> range) + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + boolean hashed = false; + + try + { + for (Range<Token> rt : merkleTrees.keySet()) + { + if (rt.intersects(range)) + { + byte[] bytes = merkleTrees.get(rt).hash(range); + if (bytes != null) + { + baos.write(bytes); + hashed = true; + } + } + } + } + catch (IOException e) + { + throw new RuntimeException("Unable to append merkle tree hash to result"); + } + + return hashed ? baos.toByteArray() : null; + } + + /** + * Get an iterator of all ranges and their MerkleTrees. + */ + public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator() + { + return merkleTrees.entrySet().iterator(); + } + ++ public long rowCount() ++ { ++ long totalCount = 0; ++ for (MerkleTree tree : merkleTrees.values()) ++ { ++ totalCount += tree.rowCount(); ++ } ++ return totalCount; ++ } ++ + public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements + Iterable<MerkleTree.TreeRange>, + PeekingIterator<MerkleTree.TreeRange> + { + private final Iterator<MerkleTree> it; + + private MerkleTree.TreeRangeIterator current = null; + + private TreeRangeIterator() + { + it = merkleTrees.values().iterator(); + } + + public MerkleTree.TreeRange computeNext() + { + if (current == null || !current.hasNext()) + return nextIterator(); + + return current.next(); + } + + private MerkleTree.TreeRange nextIterator() + { + if (it.hasNext()) + { + current = it.next().invalids(); + + return current.next(); + } + + return endOfData(); + } + + public Iterator<MerkleTree.TreeRange> iterator() + { + return this; + } + } + + /** + * Get the differences between the two sets of MerkleTrees. + * + * @param ltree + * @param rtree + * @return + */ + public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree) + { + List<Range<Token>> differences = new ArrayList<>(); + for (MerkleTree tree : ltree.merkleTrees.values()) + { + differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange))); + } + return differences; + } + + public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees> + { + public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException + { + out.writeInt(trees.merkleTrees.size()); + for (MerkleTree tree : trees.merkleTrees.values()) + { + MerkleTree.serializer.serialize(tree, out, version); + } + } + + public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException + { + IPartitioner partitioner = null; + int nTrees = in.readInt(); + Collection<MerkleTree> trees = new ArrayList<>(nTrees); + if (nTrees > 0) + { + for (int i = 0; i < nTrees; i++) + { + MerkleTree tree = MerkleTree.serializer.deserialize(in, version); + trees.add(tree); + + if (partitioner == null) + partitioner = tree.partitioner(); + else + assert tree.partitioner() == partitioner; + } + } + + return new MerkleTrees(partitioner, trees); + } + + public long serializedSize(MerkleTrees trees, int version) + { + assert trees != null; + + long size = TypeSizes.sizeof(trees.merkleTrees.size()); + for (MerkleTree tree : trees.merkleTrees.values()) + { + size += MerkleTree.serializer.serializedSize(tree, version); + } + return size; + } + + } + + private static class TokenRangeComparator implements Comparator<Range<Token>> + { + @Override + public int compare(Range<Token> rt1, Range<Token> rt2) + { + if (rt1.left.compareTo(rt2.left) == 0) + return 0; + + return rt1.compareTo(rt2); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 198b01b,471f8cf..0ce81d3 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@@ -115,10 -125,9 +115,10 @@@ public class CompactionsTes return store; } - private long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl) + public static long populate(String ks, String cf, int startRowKey, int endRowKey, int ttl) { long timestamp = System.currentTimeMillis(); + CFMetaData cfm = Keyspace.open(ks).getColumnFamilyStore(cf).metadata; for (int i = startRowKey; i <= endRowKey; i++) { DecoratedKey key = Util.dk(Integer.toString(i)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/413e48e6/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java index 14f5707,61ab3da..9c32cef --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@@ -15,13 -15,22 +15,23 @@@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.repair; -import java.io.IOException; import java.net.InetAddress; -import java.security.MessageDigest; +import java.util.Arrays; + import java.util.Collections; ++import java.util.Iterator; ++import java.util.Map; import java.util.UUID; ++import java.util.concurrent.CompletableFuture; + import java.util.concurrent.TimeUnit; + + import com.google.common.util.concurrent.ListenableFuture; + import com.google.common.util.concurrent.SettableFuture; + import org.apache.cassandra.db.compaction.CompactionManager; + import org.apache.cassandra.db.compaction.CompactionsTest; + import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.SequentialWriter; import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@@ -39,24 -51,26 +49,29 @@@ import org.apache.cassandra.net.IMessag import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; +import org.apache.cassandra.schema.KeyspaceParams; + import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; - import org.apache.cassandra.utils.concurrent.SimpleCondition; ++import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.concurrent.SimpleCondition; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class ValidatorTest { + private static final long TEST_TIMEOUT = 60; //seconds + private static final String keyspace = "ValidatorTest"; private static final String columnFamily = "Standard1"; - private final IPartitioner partitioner = StorageService.getPartitioner(); + private static IPartitioner partitioner; @BeforeClass public static void defineSchema() throws Exception @@@ -78,36 -92,9 +93,9 @@@ public void testValidatorComplete() throws Throwable { Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); + final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range)); - final SimpleCondition lock = new SimpleCondition(); - MessagingService.instance().addMessageSink(new IMessageSink() - { - public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) - { - try - { - if (message.verb == MessagingService.Verb.REPAIR_MESSAGE) - { - RepairMessage m = (RepairMessage) message.payload; - assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); - assertEquals(desc, m.desc); - assertTrue(((ValidationComplete) m).success()); - assertNotNull(((ValidationComplete) m).trees); - } - } - finally - { - lock.signalAll(); - } - return false; - } - - public boolean allowIncomingMessage(MessageIn message, int id) - { - return false; - } - }); - final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); ++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); InetAddress remote = InetAddress.getByName("127.0.0.2"); @@@ -130,37 -116,128 +118,111 @@@ Token min = tree.partitioner().getMinimumToken(); assertNotNull(tree.hash(new Range<>(min, min))); - if (!lock.isSignaled()) - lock.await(); + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); - assertTrue(((ValidationComplete) m).success); - assertNotNull(((ValidationComplete) m).tree); ++ assertTrue(((ValidationComplete) m).success()); ++ assertNotNull(((ValidationComplete) m).trees); } - private static class CompactedRowStub extends AbstractCompactedRow - { - private CompactedRowStub(DecoratedKey key) - { - super(key); - } - - public RowIndexEntry write(long currentPosition, SequentialWriter out) throws IOException - { - throw new UnsupportedOperationException(); - } - - public void update(MessageDigest digest) { } - - public ColumnStats columnStats() - { - throw new UnsupportedOperationException(); - } - - public void close() throws IOException { } - } @Test public void testValidatorFailed() throws Throwable { Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken()); - final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range); + final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range)); - final SimpleCondition lock = new SimpleCondition(); - final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); ++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); + + InetAddress remote = InetAddress.getByName("127.0.0.2"); + + Validator validator = new Validator(desc, remote, 0); + validator.fail(); + + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); - assertFalse(((ValidationComplete) m).success); - assertNull(((ValidationComplete) m).tree); ++ assertFalse(((ValidationComplete) m).success()); ++ assertNull(((ValidationComplete) m).trees); + } + + @Test + public void simpleValidationTest128() throws Exception + { + simpleValidationTest(128); + } + + @Test + public void simpleValidationTest1500() throws Exception + { + simpleValidationTest(1500); + } + + /** + * Test for CASSANDRA-5263 + * 1. Create N rows + * 2. Run validation compaction + * 3. Expect merkle tree with size 2^(log2(n)) + */ + public void simpleValidationTest(int n) throws Exception + { + Keyspace ks = Keyspace.open(keyspace); + ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily); + cfs.clearUnsafe(); + + // disable compaction while flushing + cfs.disableAutoCompaction(); + + CompactionsTest.populate(keyspace, columnFamily, 0, n, 0); //ttl=3s + + cfs.forceBlockingFlush(); - assertEquals(1, cfs.getSSTables().size()); ++ assertEquals(1, cfs.getLiveSSTables().size()); + + // wait enough to force single compaction + TimeUnit.SECONDS.sleep(5); + - SSTableReader sstable = cfs.getSSTables().iterator().next(); ++ SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + UUID repairSessionId = UUIDGen.getTimeUUID(); + final RepairJobDesc desc = new RepairJobDesc(repairSessionId, UUIDGen.getTimeUUID(), cfs.keyspace.getName(), - cfs.getColumnFamilyName(), new Range<Token>(sstable.first.getToken(), - sstable.last.getToken())); ++ cfs.getColumnFamilyName(), Collections.singletonList(new Range<>(sstable.first.getToken(), ++ sstable.last.getToken()))); + + ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(), - Collections.singletonList(cfs), Collections.singleton(desc.range), - false, false); ++ Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE, ++ false); + - final ListenableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); ++ final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true); + CompactionManager.instance.submitValidation(cfs, validator); + + MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); + assertEquals(MessagingService.Verb.REPAIR_MESSAGE, message.verb); + RepairMessage m = (RepairMessage) message.payload; + assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); + assertEquals(desc, m.desc); - assertTrue(((ValidationComplete) m).success); - MerkleTree tree = ((ValidationComplete) m).tree; ++ assertTrue(((ValidationComplete) m).success()); ++ MerkleTrees trees = ((ValidationComplete) m).trees; + - assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), tree.size(), 0.0); - assertEquals(tree.rowCount(), n); ++ Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = trees.iterator(); ++ while (iterator.hasNext()) ++ { ++ assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0); ++ } ++ assertEquals(trees.rowCount(), n); + } + - private ListenableFuture<MessageOut> registerOutgoingMessageSink() ++ private CompletableFuture<MessageOut> registerOutgoingMessageSink() + { - final SettableFuture<MessageOut> future = SettableFuture.create(); ++ final CompletableFuture<MessageOut> future = new CompletableFuture<>(); MessagingService.instance().addMessageSink(new IMessageSink() { public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) { - try - { - if (message.verb == MessagingService.Verb.REPAIR_MESSAGE) - { - RepairMessage m = (RepairMessage) message.payload; - assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType); - assertEquals(desc, m.desc); - assertFalse(((ValidationComplete) m).success()); - assertNull(((ValidationComplete) m).trees); - } - } - finally - { - lock.signalAll(); - } - future.set(message); ++ future.complete(message); return false; }