Updated Branches: refs/heads/trunk 95b833d42 -> 8aaaacd09
Use EntryWeigher instead of Weigher to Measuring the cache patch by Vijay; reviewed by Jonathan Ellis for CASSANDRA-4315 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8aaaacd0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8aaaacd0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8aaaacd0 Branch: refs/heads/trunk Commit: 8aaaacd09d8e34c93271383a0c86ecce38ef1965 Parents: 95b833d Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Thu Jun 14 18:10:42 2012 -0700 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Thu Jun 14 18:10:42 2012 -0700 ---------------------------------------------------------------------- .../cassandra/cache/ConcurrentLinkedHashCache.java | 51 ++++++--------- .../cache/ConcurrentLinkedHashCacheProvider.java | 24 +------ .../apache/cassandra/cache/IRowCacheProvider.java | 4 +- .../apache/cassandra/cache/SerializingCache.java | 23 ++++--- .../cassandra/cache/SerializingCacheProvider.java | 4 +- .../org/apache/cassandra/db/RowIndexEntry.java | 8 ++- .../org/apache/cassandra/service/CacheService.java | 34 ++++++++-- .../apache/cassandra/cache/CacheProviderTest.java | 7 ++- 8 files changed, 78 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java index 8f36b15..375a7d0 100644 --- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java +++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java @@ -19,10 +19,10 @@ package org.apache.cassandra.cache; import java.util.Set; -import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; -import com.googlecode.concurrentlinkedhashmap.Weigher; +import org.github.jamm.MemoryMeter; -import com.googlecode.concurrentlinkedhashmap.Weighers; +import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import com.googlecode.concurrentlinkedhashmap.EntryWeigher; /** Wrapper so CLHM can implement ICache interface. * (this is what you get for making library classes final.) */ @@ -30,42 +30,20 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V> { public static final int DEFAULT_CONCURENCY_LEVEL = 64; private final ConcurrentLinkedHashMap<K, V> map; + private static final MemoryMeter meter = new MemoryMeter().omitSharedBufferOverhead(); - public ConcurrentLinkedHashCache(ConcurrentLinkedHashMap<K, V> map) + private ConcurrentLinkedHashCache(ConcurrentLinkedHashMap<K, V> map) { this.map = map; } /** - * Initialize a cache with weigher = Weighers.singleton() and initial capacity 0 - * - * @param capacity cache weighted capacity - * - * @param <K> key type - * @param <V> value type - * - * @return initialized cache - */ - public static <K, V> ConcurrentLinkedHashCache<K, V> create(long capacity) - { - return create(capacity, Weighers.<V>singleton()); - } - - /** - * Initialize a cache with initial capacity set to 0 - * - * @param weightedCapacity cache weighted capacity - * @param weigher The weigher to use - * - * @param <K> key type - * @param <V> value type - * - * @return initialized cache + * Initialize a cache with initial capacity with weightedCapacity */ - public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity, Weigher<V> weigher) + public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity, EntryWeigher<K, V> entryWeiger) { ConcurrentLinkedHashMap<K, V> map = new ConcurrentLinkedHashMap.Builder<K, V>() - .weigher(weigher) + .weigher(entryWeiger) .maximumWeightedCapacity(weightedCapacity) .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL) .build(); @@ -73,6 +51,19 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V> return new ConcurrentLinkedHashCache<K, V>(map); } + public static <K, V> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity) + { + return create(weightedCapacity, new EntryWeigher<K, V>() + { + public int weightOf(K key, V value) + { + long size = meter.measure(key) + meter.measure(value); + assert size < Integer.MAX_VALUE : "Serialized size cannot be more than 2GB/Integer.MAX_VALUE"; + return (int) size; + } + }); + } + public long capacity() { return map.capacity(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java index b66b8b3..e1e06ee 100644 --- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java @@ -17,30 +17,10 @@ */ package org.apache.cassandra.cache; -import com.googlecode.concurrentlinkedhashmap.Weigher; -import com.googlecode.concurrentlinkedhashmap.Weighers; - -import org.github.jamm.MemoryMeter; - public class ConcurrentLinkedHashCacheProvider implements IRowCacheProvider { - public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher) - { - return ConcurrentLinkedHashCache.create(capacity, useMemoryWeigher - ? createMemoryWeigher() - : Weighers.<IRowCacheEntry>singleton()); - } - - private static Weigher<IRowCacheEntry> createMemoryWeigher() + public ICache<RowCacheKey, IRowCacheEntry> create(long capacity) { - return new Weigher<IRowCacheEntry>() - { - final MemoryMeter meter = new MemoryMeter(); - - public int weightOf(IRowCacheEntry value) - { - return (int) Math.min(meter.measure(value), Integer.MAX_VALUE); - } - }; + return ConcurrentLinkedHashCache.create(capacity); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/IRowCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java index 2e3ff99..003bfae 100644 --- a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java @@ -17,12 +17,10 @@ */ package org.apache.cassandra.cache; -import org.apache.cassandra.db.ColumnFamily; - /** * Provides cache objects with a requested capacity. */ public interface IRowCacheProvider { - public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher); + public ICache<RowCacheKey, IRowCacheEntry> create(long capacity); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/SerializingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java index 9e36a89..54067e3 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCache.java +++ b/src/java/org/apache/cassandra/cache/SerializingCache.java @@ -22,9 +22,9 @@ import java.io.IOException; import java.util.Set; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import com.googlecode.concurrentlinkedhashmap.EntryWeigher; import com.googlecode.concurrentlinkedhashmap.EvictionListener; import com.googlecode.concurrentlinkedhashmap.Weigher; -import com.googlecode.concurrentlinkedhashmap.Weighers; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; @@ -49,7 +49,7 @@ public class SerializingCache<K, V> implements ICache<K, V> private final ConcurrentLinkedHashMap<K, FreeableMemory> map; private final ISerializer<V> serializer; - public SerializingCache(long capacity, boolean useMemoryWeigher, ISerializer<V> serializer) + private SerializingCache(long capacity, Weigher<FreeableMemory> weigher, ISerializer<V> serializer) { this.serializer = serializer; @@ -62,24 +62,29 @@ public class SerializingCache<K, V> implements ICache<K, V> }; this.map = new ConcurrentLinkedHashMap.Builder<K, FreeableMemory>() - .weigher(useMemoryWeigher - ? createMemoryWeigher() - : Weighers.<FreeableMemory>singleton()) + .weigher(weigher) .maximumWeightedCapacity(capacity) .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL) .listener(listener) .build(); } - private static Weigher<FreeableMemory> createMemoryWeigher() + public static <K, V> SerializingCache<K, V> create(long weightedCapacity, Weigher<FreeableMemory> weigher, ISerializer<V> serializer) { - return new Weigher<FreeableMemory>() + return new SerializingCache<K, V>(weightedCapacity, weigher, serializer); + } + + public static <K, V> SerializingCache<K, V> create(long weightedCapacity, ISerializer<V> serializer) + { + return create(weightedCapacity, new Weigher<FreeableMemory>() { public int weightOf(FreeableMemory value) { - return (int) Math.min(value.size(), Integer.MAX_VALUE); + long size = value.size(); + assert size < Integer.MAX_VALUE : "Serialized size cannot be more than 2GB"; + return (int) size; } - }; + }, serializer); } private V deserialize(FreeableMemory mem) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java index 6adcd89..c8d11d2 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java @@ -29,9 +29,9 @@ import org.apache.cassandra.net.MessagingService; public class SerializingCacheProvider implements IRowCacheProvider { - public ICache<RowCacheKey, IRowCacheEntry> create(long capacity, boolean useMemoryWeigher) + public ICache<RowCacheKey, IRowCacheEntry> create(long capacity) { - return new SerializingCache<RowCacheKey, IRowCacheEntry>(capacity, useMemoryWeigher, new RowCacheSerializer()); + return SerializingCache.create(capacity, new RowCacheSerializer()); } // Package protected for tests http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index 0c50746..b7660e5 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -20,10 +20,8 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import org.apache.cassandra.io.sstable.Descriptor; @@ -43,6 +41,11 @@ public class RowIndexEntry this.position = position; } + public int serializedSize() + { + return TypeSizes.NATIVE.sizeof(position); + } + public static RowIndexEntry create(long position, DeletionInfo deletionInfo, ColumnIndex index) { if (index != null && index.columnsIndex != null && index.columnsIndex.size() > 1) @@ -184,6 +187,7 @@ public class RowIndexEntry return bloomFilter; } + @Override public int serializedSize() { TypeSizes typeSizes = TypeSizes.NATIVE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 77f8349..6c7b0a2 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -39,7 +39,6 @@ import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.sstable.Descriptor; @@ -48,16 +47,18 @@ import org.apache.cassandra.io.sstable.SSTableReader.Operator; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.googlecode.concurrentlinkedhashmap.EntryWeigher; + public class CacheService implements CacheServiceMBean { private static final Logger logger = LoggerFactory.getLogger(CacheService.class); public static final String MBEAN_NAME = "org.apache.cassandra.db:type=Caches"; - public static final int AVERAGE_KEY_CACHE_ROW_SIZE = 48; public static enum CacheType { @@ -117,7 +118,25 @@ public class CacheService implements CacheServiceMBean // as values are constant size we can use singleton weigher // where 48 = 40 bytes (average size of the key) + 8 bytes (size of value) - ICache<KeyCacheKey, RowIndexEntry> kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / AVERAGE_KEY_CACHE_ROW_SIZE); + ICache<KeyCacheKey, RowIndexEntry> kc; + if (MemoryMeter.isInitialized()) + { + kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity); + } + else + { + logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); KeyCache size in JVM Heap will not be calculated accurately. " + + "Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead"); + /* We don't know the overhead size because memory meter is not enabled. */ + EntryWeigher<KeyCacheKey, RowIndexEntry> weigher = new EntryWeigher<KeyCacheKey, RowIndexEntry>() + { + public int weightOf(KeyCacheKey key, RowIndexEntry entry) + { + return key.key.length + entry.serializedSize(); + } + }; + kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity, weigher); + } AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer()); int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave(); @@ -143,7 +162,7 @@ public class CacheService implements CacheServiceMBean long rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024; // cache object - ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true); + ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity); AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE, new RowCacheSerializer()); int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave(); @@ -245,7 +264,7 @@ public class CacheService implements CacheServiceMBean public long getKeyCacheCapacityInBytes() { - return keyCache.getCapacity() * AVERAGE_KEY_CACHE_ROW_SIZE; + return keyCache.getCapacity(); } public long getKeyCacheCapacityInMB() @@ -258,7 +277,8 @@ public class CacheService implements CacheServiceMBean if (capacity < 0) throw new RuntimeException("capacity should not be negative."); - keyCache.setCapacity(capacity * 1024 * 1024 / 48); + long weightedCapacity = capacity * 1024 * 1024; + keyCache.setCapacity(MemoryMeter.isInitialized() ? weightedCapacity : (weightedCapacity / 48)); } public long getRowCacheSize() @@ -268,7 +288,7 @@ public class CacheService implements CacheServiceMBean public long getKeyCacheSize() { - return keyCache.weightedSize() * AVERAGE_KEY_CACHE_ROW_SIZE; + return keyCache.weightedSize(); } public void reduceCacheSizes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8aaaacd0/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 46164bf..8b4d17b 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -30,6 +30,9 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ColumnFamily; + +import com.googlecode.concurrentlinkedhashmap.Weighers; + import static org.apache.cassandra.Util.column; import static org.junit.Assert.*; @@ -105,7 +108,7 @@ public class CacheProviderTest extends SchemaLoader @Test public void testHeapCache() throws InterruptedException { - ICache<String, IRowCacheEntry> cache = ConcurrentLinkedHashCache.create(CAPACITY); + ICache<String, IRowCacheEntry> cache = ConcurrentLinkedHashCache.create(CAPACITY, Weighers.<String, IRowCacheEntry>entrySingleton()); ColumnFamily cf = createCF(); simpleCase(cf, cache); concurrentCase(cf, cache); @@ -114,7 +117,7 @@ public class CacheProviderTest extends SchemaLoader @Test public void testSerializingCache() throws InterruptedException { - ICache<String, IRowCacheEntry> cache = new SerializingCache<String, IRowCacheEntry>(CAPACITY, false, new SerializingCacheProvider.RowCacheSerializer()); + ICache<String, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, Weighers.<FreeableMemory>singleton(), new SerializingCacheProvider.RowCacheSerializer()); ColumnFamily cf = createCF(); simpleCase(cf, cache); concurrentCase(cf, cache);