Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442f4737 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442f4737 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442f4737 Branch: refs/heads/cassandra-3.0 Commit: 442f4737ceddd34f14210da49cee4d48b468f01e Parents: 2f8e5f3 b5d6d4f Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Jan 25 10:05:26 2016 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Jan 25 10:05:59 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Memtable.java | 9 + .../db/compaction/CompactionController.java | 19 +- .../db/compaction/CompactionControllerTest.java | 195 +++++++++++++++++++ 4 files changed, 221 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index bb26b20,cdc3b34..70abffe --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,30 -1,8 +1,31 @@@ +3.0.3 + * Update CQL documentation (CASSANDRA-10899) + * Check the column name, not cell name, for dropped columns when reading + legacy sstables (CASSANDRA-11018) + * Don't attempt to index clustering values of static rows (CASSANDRA-11021) + * Remove checksum files after replaying hints (CASSANDRA-10947) + * Support passing base table metadata to custom 2i validation (CASSANDRA-10924) + * Ensure stale index entries are purged during reads (CASSANDRA-11013) + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954) + * Fix UnsupportedOperationException when reading old sstable with range + tombstone (CASSANDRA-10743) + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910) + * Fix potential assertion error during compaction (CASSANDRA-10944) + * Fix counting of received sstables in streaming (CASSANDRA-10949) + * Implement hints compression (CASSANDRA-9428) + * Fix potential assertion error when reading static columns (CASSANDRA-10903) + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711) + * Avoid building PartitionUpdate in toString (CASSANDRA-10897) + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797) + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873) + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653) + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837) + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806) +Merged from 2.2: 2.2.5 + * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949) * Apply change to compaction throughput in real time (CASSANDRA-10025) * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955) - * Avoid over-fetching during the page of range queries (CASSANDRA-8521) * Start L0 STCS-compactions even if there is a L0 -> L1 compaction going (CASSANDRA-10979) * Make UUID LSB unique per process (CASSANDRA-7925) http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Memtable.java index 96b1775,fb4da72..5d5f7bf --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@@ -237,12 -227,24 +240,13 @@@ public class Memtable implements Compar } } - final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer); - minTimestamp = Math.min(minTimestamp, updater.minTimestamp); - liveDataSize.addAndGet(initialSize + updater.dataSize); - currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount()); - return updater.colUpdateTimeDelta; - } - - // for debugging - public String contents() - { - StringBuilder builder = new StringBuilder(); - builder.append("{"); - for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet()) - { - builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", "); - } - builder.append("}"); - return builder.toString(); + long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer); ++ minTimestamp = Math.min(minTimestamp, previous.stats().minTimestamp); + liveDataSize.addAndGet(initialSize + pair[0]); + columnsCollector.update(update.columns()); + statsCollector.update(update.stats()); + currentOperations.addAndGet(update.operationCount()); + return pair[1]; } public int partitionCount() http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java index 259e1b9,00d1344..5cb60c5 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@@ -19,9 -19,6 +19,11 @@@ package org.apache.cassandra.db.compact import java.util.*; ++import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import com.google.common.collect.Iterables; + ++import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -195,6 -190,13 +201,13 @@@ public class CompactionController imple else if (sstable.getBloomFilter().isPresent(key)) min = Math.min(min, sstable.getMinTimestamp()); } + + for (Memtable memtable : cfs.getTracker().getView().getAllMemtables()) + { - ColumnFamily cf = memtable.getColumnFamily(key); - if (cf != null) - min = Math.min(min, memtable.getMinTimestamp()); ++ Partition partition = memtable.getPartition(key); ++ if (partition != null) ++ min = Math.min(min, partition.stats().minTimestamp); + } return min; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/442f4737/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java index 0000000,750a38e..e781716 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@@ -1,0 -1,191 +1,195 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.compaction; + + import java.nio.ByteBuffer; + import java.util.Set; + + import com.google.common.collect.Sets; + import org.junit.BeforeClass; + import org.junit.Test; + + import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; ++import org.apache.cassandra.Util; ++import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.DecoratedKey; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.composites.CellName; ++import org.apache.cassandra.db.RowUpdateBuilder; ++import org.apache.cassandra.db.marshal.AsciiType; ++import org.apache.cassandra.db.partitions.PartitionUpdate; + import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.locator.SimpleStrategy; ++import org.apache.cassandra.schema.KeyspaceParams; + import org.apache.cassandra.utils.ByteBufferUtil; + import org.apache.cassandra.utils.FBUtilities; + -import static org.apache.cassandra.Util.cellname; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertNotNull; + + public class CompactionControllerTest extends SchemaLoader + { + private static final String KEYSPACE = "CompactionControllerTest"; + private static final String CF1 = "Standard1"; + private static final String CF2 = "Standard2"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE, CF1), - SchemaLoader.standardCFMD(KEYSPACE, CF2)); ++ KeyspaceParams.simple(1), ++ CFMetaData.Builder.create(KEYSPACE, CF1, true, false, false) ++ .addPartitionKey("pk", AsciiType.instance) ++ .addClusteringColumn("ck", AsciiType.instance) ++ .addRegularColumn("val", AsciiType.instance) ++ .build(), ++ CFMetaData.Builder.create(KEYSPACE, CF2, true, false, false) ++ .addPartitionKey("pk", AsciiType.instance) ++ .addClusteringColumn("ck", AsciiType.instance) ++ .addRegularColumn("val", AsciiType.instance) ++ .build()); + } + + @Test + public void testMaxPurgeableTimestamp() + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + - ByteBuffer rowKey = ByteBufferUtil.bytes("k1"); - DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey); ++ DecoratedKey key = Util.dk("k1"); + + long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp + long timestamp2 = timestamp1 - 5; + long timestamp3 = timestamp2 - 5; // oldest timestamp + + // add to first memtable - applyMutation(CF1, rowKey, timestamp1); ++ applyMutation(cfs.metadata, key, timestamp1); + + // check max purgeable timestamp without any sstables + try(CompactionController controller = new CompactionController(cfs, null, 0)) + { + assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only + + cfs.forceBlockingFlush(); + assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables + } + - Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting ++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); // first sstable is compacting + + // create another sstable - applyMutation(CF1, rowKey, timestamp2); ++ applyMutation(cfs.metadata, key, timestamp2); + cfs.forceBlockingFlush(); + + // check max purgeable timestamp when compacting the first sstable with and without a memtable + try (CompactionController controller = new CompactionController(cfs, compacting, 0)) + { + assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only + - applyMutation(CF1, rowKey, timestamp3); ++ applyMutation(cfs.metadata, key, timestamp3); + + assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable + } + + // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable + cfs.forceBlockingFlush(); + + //newest to oldest + try (CompactionController controller = new CompactionController(cfs, null, 0)) + { - applyMutation(CF1, rowKey, timestamp1); - applyMutation(CF1, rowKey, timestamp2); - applyMutation(CF1, rowKey, timestamp3); ++ applyMutation(cfs.metadata, key, timestamp1); ++ applyMutation(cfs.metadata, key, timestamp2); ++ applyMutation(cfs.metadata, key, timestamp3); + + assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only + } + + cfs.forceBlockingFlush(); + + //oldest to newest + try (CompactionController controller = new CompactionController(cfs, null, 0)) + { - applyMutation(CF1, rowKey, timestamp3); - applyMutation(CF1, rowKey, timestamp2); - applyMutation(CF1, rowKey, timestamp1); ++ applyMutation(cfs.metadata, key, timestamp3); ++ applyMutation(cfs.metadata, key, timestamp2); ++ applyMutation(cfs.metadata, key, timestamp1); + + assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only + } + } + + @Test + public void testGetFullyExpiredSSTables() + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2); + cfs.truncateBlocking(); + - ByteBuffer rowKey = ByteBufferUtil.bytes("k1"); ++ DecoratedKey key = Util.dk("k1"); + + long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp + long timestamp2 = timestamp1 - 5; + long timestamp3 = timestamp2 - 5; // oldest timestamp + + // create sstable with tombstone that should be expired in no older timestamps - applyDeleteMutation(CF2, rowKey, timestamp2); ++ applyDeleteMutation(cfs.metadata, key, timestamp2); + cfs.forceBlockingFlush(); + + // first sstable with tombstone is compacting - Set<SSTableReader> compacting = Sets.newHashSet(cfs.getSSTables()); ++ Set<SSTableReader> compacting = Sets.newHashSet(cfs.getLiveSSTables()); + + // create another sstable with more recent timestamp - applyMutation(CF2, rowKey, timestamp1); ++ applyMutation(cfs.metadata, key, timestamp1); + cfs.forceBlockingFlush(); + + // second sstable is overlapping - Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting); ++ Set<SSTableReader> overlapping = Sets.difference(Sets.newHashSet(cfs.getLiveSSTables()), compacting); + + // the first sstable should be expired because the overlapping sstable is newer and the gc period is later + int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5; + Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore); + assertNotNull(expired); + assertEquals(1, expired.size()); + assertEquals(compacting.iterator().next(), expired.iterator().next()); + + // however if we add an older mutation to the memtable then the sstable should not be expired - applyMutation(CF2, rowKey, timestamp3); ++ applyMutation(cfs.metadata, key, timestamp3); + expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore); + assertNotNull(expired); + assertEquals(0, expired.size()); + } + - private void applyMutation(String cf, ByteBuffer rowKey, long timestamp) ++ private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp) + { - CellName colName = cellname("birthdate"); + ByteBuffer val = ByteBufferUtil.bytes(1L); + - Mutation rm = new Mutation(KEYSPACE, rowKey); - rm.add(cf, colName, val, timestamp); - rm.applyUnsafe(); ++ new RowUpdateBuilder(cfm, timestamp, key) ++ .clustering("ck") ++ .add("val", val) ++ .build() ++ .applyUnsafe(); + } + - private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp) ++ private void applyDeleteMutation(CFMetaData cfm, DecoratedKey key, long timestamp) + { - Mutation rm = new Mutation(KEYSPACE, rowKey); - rm.delete(cf, timestamp); - rm.applyUnsafe(); ++ new Mutation(PartitionUpdate.fullPartitionDelete(cfm, key, timestamp, FBUtilities.nowInSeconds())) ++ .applyUnsafe(); + } - - - + }