Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f19dd4e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f19dd4e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f19dd4e Branch: refs/heads/cassandra-3.0 Commit: 9f19dd4e4e4f4adc948b36a3fd38077cbc691617 Parents: a320737 dbfeeac Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Dec 2 08:53:33 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Dec 2 08:53:33 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 3 +- src/java/org/apache/cassandra/dht/Range.java | 44 +++++++++++++ .../org/apache/cassandra/dht/RangeTest.java | 66 ++++++++++++++++++++ 4 files changed, 113 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index cf73f57,b0f9588..eaad3a2 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,23 -1,5 +1,24 @@@ -2.1.12 +2.2.4 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225) + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775) + * Reject index queries while the index is building (CASSANDRA-8505) + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747) + * Fix JSON update with prepared statements (CASSANDRA-10631) + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) +Merged from 2.1: + * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index f3a69a6,2630ba2..65f93c0 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1192,90 -1092,68 +1192,91 @@@ public class CompactionManager implemen if (!new File(sstable.getFilename()).exists()) { logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable); + i.remove(); continue; } + if (groupMaxDataAge < sstable.maxDataAge) + groupMaxDataAge = sstable.maxDataAge; + } - logger.info("Anticompacting {}", sstable); - Set<SSTableReader> sstableAsSet = new HashSet<>(); - sstableAsSet.add(sstable); + if (anticompactionGroup.originals().size() == 0) + { + logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); + return 0; + } - File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); - SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false); + logger.info("Anticompacting {}", anticompactionGroup); + Set<SSTableReader> sstableAsSet = anticompactionGroup.originals(); + + File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); + long repairedKeyCount = 0; + long unrepairedKeyCount = 0; + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); + try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); + SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); + CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) + { + int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable))); - CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) - { - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)sstable.estimatedKeys()); - repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable)); + repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); + unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - metrics.beginCompaction(ci); - try + CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()); + metrics.beginCompaction(ci); + try + { + @SuppressWarnings("resource") + CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); ++ Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges); + while (iter.hasNext()) { - Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges); - while (iter.hasNext()) + @SuppressWarnings("resource") + AbstractCompactedRow row = iter.next(); + // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) ++ if (containmentChecker.contains(row.key.getToken())) { - AbstractCompactedRow row = iter.next(); - if (containmentChecker.contains(row.key.getToken())) - { - repairedSSTableWriter.append(row); - repairedKeyCount++; - } - // otherwise save into the new 'non-repaired' table - else - { - unRepairedSSTableWriter.append(row); - unrepairedKeyCount++; - } + repairedSSTableWriter.append(row); + repairedKeyCount++; + } + // otherwise save into the new 'non-repaired' table + else + { + unRepairedSSTableWriter.append(row); + unrepairedKeyCount++; } } - finally - { - metrics.finishCompaction(ci); - } - anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); - anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); } - catch (Throwable e) + finally { - JVMStabilityInspector.inspectThrowable(e); - logger.error("Error anticompacting " + sstable, e); - repairedSSTableWriter.abort(); - unRepairedSSTableWriter.abort(); + metrics.finishCompaction(ci); } - } - String format = "Repaired {} keys of {} for {}/{}"; - logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName()); - String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size()); - return anticompactedSSTables; + List<SSTableReader> anticompactedSSTables = new ArrayList<>(); + // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method, + // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API + // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted. + anticompactionGroup.permitRedundantTransitions(); + repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit(); + unRepairedSSTableWriter.prepareToCommit(); + anticompactedSSTables.addAll(repairedSSTableWriter.finished()); + anticompactedSSTables.addAll(unRepairedSSTableWriter.finished()); + repairedSSTableWriter.commit(); + unRepairedSSTableWriter.commit(); + + logger.trace("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, + repairedKeyCount + unrepairedKeyCount, + cfs.keyspace.getName(), + cfs.getColumnFamilyName(), + anticompactionGroup); + return anticompactedSSTables.size(); + } + catch (Throwable e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.error("Error anticompacting " + anticompactionGroup, e); + } + return 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/dht/Range.java index cbf093c,81c92a2..9893531 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@@ -21,7 -21,10 +21,9 @@@ import java.io.Serializable import java.util.*; import org.apache.commons.lang3.ObjectUtils; + + import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; /** @@@ -465,13 -472,67 +467,55 @@@ public class Range<T extends RingPositi /** * Compute a range of keys corresponding to a given range of token. */ - public static Range<RowPosition> makeRowRange(Token left, Token right, IPartitioner partitioner) + public static Range<RowPosition> makeRowRange(Token left, Token right) { - return new Range<RowPosition>(left.maxKeyBound(partitioner), right.maxKeyBound(partitioner), partitioner); + return new Range<RowPosition>(left.maxKeyBound(), right.maxKeyBound()); } - @SuppressWarnings("unchecked") - public AbstractBounds<RowPosition> toRowBounds() + public static Range<RowPosition> makeRowRange(Range<Token> tokenBounds) { - return (left instanceof Token) ? makeRowRange((Token)left, (Token)right, partitioner) : (Range<RowPosition>)this; - } - - @SuppressWarnings("unchecked") - public AbstractBounds<Token> toTokenBounds() - { - return (left instanceof RowPosition) ? new Range<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Range<Token>)this; - } - - public AbstractBounds<T> withNewRight(T newRight) - { - return new Range<T>(left, newRight); + return makeRowRange(tokenBounds.left, tokenBounds.right); } + + /** + * Helper class to check if a token is contained within a given collection of ranges + */ + public static class OrderedRangeContainmentChecker + { + private final Iterator<Range<Token>> normalizedRangesIterator; + private Token lastToken = null; + private Range<Token> currentRange; + + public OrderedRangeContainmentChecker(Collection<Range<Token>> ranges) + { + normalizedRangesIterator = normalize(ranges).iterator(); + assert normalizedRangesIterator.hasNext(); + currentRange = normalizedRangesIterator.next(); + } + + /** + * Returns true if the ranges given in the constructor contains the token, false otherwise. + * + * The tokens passed to this method must be in increasing order + * + * @param t token to check, must be larger than or equal to the last token passed + * @return true if the token is contained within the ranges given to the constructor. + */ + public boolean contains(Token t) + { + assert lastToken == null || lastToken.compareTo(t) <= 0; + lastToken = t; + while (true) + { + if (t.compareTo(currentRange.left) <= 0) + return false; + else if (t.compareTo(currentRange.right) <= 0 || currentRange.right.compareTo(currentRange.left) <= 0) + return true; + + if (!normalizedRangesIterator.hasNext()) + return false; + currentRange = normalizedRangesIterator.next(); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/test/unit/org/apache/cassandra/dht/RangeTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/dht/RangeTest.java index d93356a,1d8123b..85f2586 --- a/test/unit/org/apache/cassandra/dht/RangeTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeTest.java @@@ -27,11 -32,10 +32,12 @@@ import static java.util.Arrays.asList import org.apache.commons.lang3.StringUtils; import org.junit.Test; - import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; + import static org.apache.cassandra.Util.range; + import static org.junit.Assert.*; public class RangeTest @@@ -540,4 -543,64 +546,64 @@@ expected = asList(range("", "")); assertNormalize(input, expected); } + + @Test + public void testRandomOrderedRangeContainmentChecker() + { + Random r = new Random(); + for (int j = 0; j < 1000; j++) + { + int numTokens = r.nextInt(300) + 1; + List<Range<Token>> ranges = new ArrayList<>(numTokens); + List<Token> tokens = new ArrayList<>(2 * numTokens); + for (int i = 0; i < 2 * numTokens; i++) + tokens.add(t(r.nextLong())); + + Collections.sort(tokens); + + for (int i = 0; i < tokens.size(); i++) + { + ranges.add(new Range<>(tokens.get(i), tokens.get(i + 1))); + i++; + } + + List<Token> tokensToTest = new ArrayList<>(); + for (int i = 0; i < 10000; i++) + tokensToTest.add(t(r.nextLong())); + + tokensToTest.add(t(Long.MAX_VALUE)); + tokensToTest.add(t(Long.MIN_VALUE)); + tokensToTest.add(t(Long.MAX_VALUE - 1)); + tokensToTest.add(t(Long.MIN_VALUE + 1)); + Collections.sort(tokensToTest); + + Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges); + for (Token t : tokensToTest) + { + if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration + fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t)); + } + } + } + + @Test + public void testBoundariesORCC() + { + List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE)); + Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges); + assertFalse(checker.contains(t(Long.MIN_VALUE))); + assertTrue(checker.contains(t(Long.MIN_VALUE + 1))); + assertFalse(checker.contains(t(0))); + assertFalse(checker.contains(t(Long.MAX_VALUE - 1))); + assertTrue(checker.contains(t(Long.MAX_VALUE))); + } + + private static Range<Token> r(long left, long right) + { + return new Range<>(t(left), t(right)); + } + private static Token t(long t) + { - return new LongToken(t); ++ return new Murmur3Partitioner.LongToken(t); + } }