Updated Branches: refs/heads/trunk 98e6b08c7 -> ab6eaed8f
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6eaed8/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 300a21d..26f6d9d 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.columniterator.SSTableSliceIterator; import org.apache.cassandra.db.composites.CType; +import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.IVersionedSerializer; @@ -340,6 +341,41 @@ public class SliceQueryFilter implements IDiskAtomFilter return false; } + public boolean isHeadFilter() + { + return slices.length == 1 && slices[0].start.isEmpty() && !reversed; + } + + public boolean countCQL3Rows(CellNameType comparator) + { + // If comparator is dense a cell == a CQL3 rows so we're always counting CQL3 rows + // in particular. Otherwise, we do so only if we group the cells into CQL rows. + return comparator.isDense() || compositesToGroup >= 0; + } + + public boolean isFullyCoveredBy(ColumnFamily cf, long now) + { + // cf is the beginning of a partition. It covers this filter if: + // 1) either this filter requests the head of the partition and request less + // than what cf has to offer (note: we do need to use getLiveCount() for that + // as it knows if the filter count cells or CQL3 rows). + // 2) the start and finish bound of this filter are included in cf. + if (isHeadFilter() && count <= getLiveCount(cf, now)) + return true; + + if (start().isEmpty() || finish().isEmpty() || cf.getColumnCount() == 0) + return false; + + Composite low = isReversed() ? finish() : start(); + Composite high = isReversed() ? start() : finish(); + + CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name(); + CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name(); + + return cf.getComparator().compare(first, low) <= 0 + && cf.getComparator().compare(high, last) <= 0; + } + public static class Serializer implements IVersionedSerializer<SliceQueryFilter> { private CType type; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6eaed8/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index f6e3030..b414628 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -89,6 +89,12 @@ public class ColumnFamilyMetrics public final Histogram liveScannedHistogram; /** Disk space used by snapshot files which */ public final Gauge<Long> trueSnapshotsSize; + /** Row cache hits, but result out of range */ + public final Counter rowCacheHitOutOfRange; + /** Number of row cache hits */ + public final Counter rowCacheHit; + /** Number of row cache misses */ + public final Counter rowCacheMiss; public final Timer coordinatorReadLatency; public final Timer coordinatorScanLatency; @@ -104,6 +110,7 @@ public class ColumnFamilyMetrics @Deprecated public final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35); @Deprecated public final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35); + /** * Creates metrics for given {@link ColumnFamilyStore}. * @@ -347,6 +354,9 @@ public class ColumnFamilyMetrics return cfs.trueSnapshotsSize(); } }); + rowCacheHitOutOfRange = Metrics.newCounter(factory.createMetricName("RowCacheHitOutOfRange")); + rowCacheHit = Metrics.newCounter(factory.createMetricName("RowCacheHit")); + rowCacheMiss = Metrics.newCounter(factory.createMetricName("RowCacheMiss")); } public void updateSSTableIterated(int count) @@ -390,6 +400,9 @@ public class ColumnFamilyMetrics Metrics.defaultRegistry().removeMetric(factory.createMetricName("CoordinatorReadLatency")); Metrics.defaultRegistry().removeMetric(factory.createMetricName("CoordinatorScanLatency")); Metrics.defaultRegistry().removeMetric(factory.createMetricName("SnapshotsSize")); + Metrics.defaultRegistry().removeMetric(factory.createMetricName("RowCacheHitOutOfRange")); + Metrics.defaultRegistry().removeMetric(factory.createMetricName("RowCacheHit")); + Metrics.defaultRegistry().removeMetric(factory.createMetricName("RowCacheHitMiss")); } class ColumnFamilyMetricNameFactory implements MetricNameFactory http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6eaed8/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index d8c26bc..4d82f1f 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -24,7 +24,9 @@ import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -45,8 +47,10 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.Composites; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; @@ -287,6 +291,17 @@ public class CacheService implements CacheServiceMBean rowCache.clear(); } + public void invalidateRowCacheForCf(UUID cfId) + { + Iterator<RowCacheKey> rowCacheIterator = rowCache.getKeySet().iterator(); + while (rowCacheIterator.hasNext()) + { + RowCacheKey rowCacheKey = rowCacheIterator.next(); + if (rowCacheKey.cfId.equals(cfId)) + rowCacheIterator.remove(); + } + } + public void invalidateCounterCache() { counterCache.clear(); @@ -427,7 +442,8 @@ public class CacheService implements CacheServiceMBean public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception { DecoratedKey key = cfs.partitioner.decorateKey(buffer); - ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, cfs.name, Long.MIN_VALUE), Integer.MIN_VALUE); + QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE); + ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE); return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data); } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6eaed8/src/resources/org/apache/cassandra/cli/CliHelp.yaml ---------------------------------------------------------------------- diff --git a/src/resources/org/apache/cassandra/cli/CliHelp.yaml b/src/resources/org/apache/cassandra/cli/CliHelp.yaml index 0202d19..3c167bc 100644 --- a/src/resources/org/apache/cassandra/cli/CliHelp.yaml +++ b/src/resources/org/apache/cassandra/cli/CliHelp.yaml @@ -794,7 +794,11 @@ commands: - KEYS_ONLY - ROWS_ONLY - NONE; + - cells_per_row_to_cache: State the number of cells per row to cache. + Defaults to 100. Set to "ALL" if you want the old cache behaviour. + + Will not be used if row caching is not enabled. - speculative_retry: Speculative retry is used to speculate a read failure. Speculative retry will execute additional read on a different nodes when http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6eaed8/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 774efeb..5efc266 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -233,7 +233,8 @@ public class SchemaLoader simple, opts_rf1, standardCFMD(ks_rcs, "CFWithoutCache").caching(CFMetaData.Caching.NONE), - standardCFMD(ks_rcs, "CachedCF").caching(CFMetaData.Caching.ALL))); + standardCFMD(ks_rcs, "CachedCF").caching(CFMetaData.Caching.ALL).rowsPerPartitionToCache(CFMetaData.RowsPerPartitionToCache.fromString("ALL")), + standardCFMD(ks_rcs, "CachedIntCF").defaultValidator(IntegerType.instance).caching(CFMetaData.Caching.ALL))); // CounterCacheSpace schema.add(KSMetaData.testMetadata(ks_ccs, http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab6eaed8/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java index 238f61e..e21f586 100644 --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@ -19,15 +19,20 @@ package org.apache.cassandra.db; import java.net.InetAddress; +import java.nio.ByteBuffer; import java.util.Collection; +import java.util.SortedSet; +import java.util.TreeSet; import org.junit.AfterClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.CacheService; @@ -153,6 +158,79 @@ public class RowCacheTest extends SchemaLoader rowCacheLoad(100, 50, 0); CacheService.instance.setRowCacheCapacityInMB(0); } + @Test + public void testRowCacheRange() + { + CompactionManager.instance.disableAutoCompaction(); + + Keyspace keyspace = Keyspace.open(KEYSPACE); + String cf = "CachedIntCF"; + ColumnFamilyStore cachedStore = keyspace.getColumnFamilyStore(cf); + long startRowCacheHits = cachedStore.metric.rowCacheHit.count(); + long startRowCacheOutOfRange = cachedStore.metric.rowCacheHitOutOfRange.count(); + // empty the row cache + CacheService.instance.invalidateRowCache(); + + // set global row cache size to 1 MB + CacheService.instance.setRowCacheCapacityInMB(1); + + ByteBuffer key = ByteBufferUtil.bytes("rowcachekey"); + DecoratedKey dk = cachedStore.partitioner.decorateKey(key); + RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk); + Mutation mutation = new Mutation(KEYSPACE, key); + for (int i = 0; i < 200; i++) + mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis()); + mutation.applyUnsafe(); + + // populate row cache, we should not get a row cache hit; + cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf, + Composites.EMPTY, + Composites.EMPTY, + false, 10, System.currentTimeMillis())); + assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.count()); + + // do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range + cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf, + Composites.EMPTY, + Composites.EMPTY, + false, 20, System.currentTimeMillis())); + assertEquals(++startRowCacheHits, cachedStore.metric.rowCacheHit.count()); + assertEquals(startRowCacheOutOfRange, cachedStore.metric.rowCacheHitOutOfRange.count()); + + // get a slice from 95 to 105, 95->99 are in cache, we should not get a hit and then row cache is out of range + cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf, + CellNames.simpleDense(ByteBufferUtil.bytes(95)), + CellNames.simpleDense(ByteBufferUtil.bytes(105)), + false, 10, System.currentTimeMillis())); + assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.count()); + assertEquals(++startRowCacheOutOfRange, cachedStore.metric.rowCacheHitOutOfRange.count()); + + // get a slice with limit > 100, we should get a hit out of range. + cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf, + Composites.EMPTY, + Composites.EMPTY, + false, 101, System.currentTimeMillis())); + assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.count()); + assertEquals(++startRowCacheOutOfRange, cachedStore.metric.rowCacheHitOutOfRange.count()); + + + CacheService.instance.invalidateRowCache(); + + // try to populate row cache with a limit > rows to cache, we should still populate row cache; + cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf, + Composites.EMPTY, + Composites.EMPTY, + false, 105, System.currentTimeMillis())); + assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.count()); + // validate the stuff in cache; + ColumnFamily cachedCf = (ColumnFamily)CacheService.instance.rowCache.get(rck); + assertEquals(cachedCf.getColumnCount(), 100); + int i = 0; + for(Cell c : cachedCf) + { + assertEquals(c.name, Util.cellname(i++)); + } + } public void rowCacheLoad(int totalKeys, int keysToSave, int offset) throws Exception {