http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java new file mode 100644 index 0000000..a67f164 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.tinylfu; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; + +public final class TinyLfuBlockCacheConfiguration extends BlockCacheConfiguration { + + public static final String PROPERTY_PREFIX = "tinylfu"; + + public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) { + super(conf, type, PROPERTY_PREFIX); + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java new file mode 100644 index 0000000..a68c4e6 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.tinylfu; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TinyLfuBlockCacheManager extends BlockCacheManager { + + private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCacheManager.class); + + @Override + protected TinyLfuBlockCache createCache(AccumuloConfiguration conf, CacheType type) { + TinyLfuBlockCacheConfiguration cc = new TinyLfuBlockCacheConfiguration(conf, type); + LOG.info("Creating {} cache with configuration {}", type, cc); + return new TinyLfuBlockCache(cc); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index 9b2b5d9..daa8f22 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -89,6 +89,11 @@ public class SummaryReader { public Stats getStats() { return summaryCache.getStats(); } + + @Override + public long getMaxHeapSize() { + return summaryCache.getMaxHeapSize(); + } } private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration> summarySelector) throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java new file mode 100644 index 0000000..e17fb76 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; +import org.junit.Assert; +import org.junit.Test; + +public class BlockCacheFactoryTest { + + @Test + public void testCreateLruBlockCacheFactory() throws Exception { + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager.getInstance(cc); + } + + @Test + public void testCreateTinyLfuBlockCacheFactory() throws Exception { + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); + BlockCacheManager.getInstance(cc); + } + + @Test + public void testStartWithDefault() throws Exception { + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + BlockCacheManager manager = BlockCacheManager.getInstance(dc); + manager.start(dc); + Assert.assertNotNull(manager.getBlockCache(CacheType.INDEX)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java new file mode 100644 index 0000000..72ea49c --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.junit.Assert; +import org.junit.Test; + +public class BlockConfigurationHelperTest { + + @Test + public void testGetPropertyPrefix() throws Exception { + Assert.assertEquals("general.custom.cache.block.lru.data.", BlockCacheConfiguration.getPrefix(CacheType.DATA, "lru")); + Assert.assertEquals("general.custom.cache.block.lru.index.", BlockCacheConfiguration.getPrefix(CacheType.INDEX, "lru")); + Assert.assertEquals("general.custom.cache.block.lru.summary.", BlockCacheConfiguration.getPrefix(CacheType.SUMMARY, "lru")); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index a5ab14a..966b6eb 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -22,6 +22,13 @@ import java.util.Random; import junit.framework.TestCase; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; + /** * Tests the concurrent LruBlockCache. * <p> @@ -31,12 +38,50 @@ import junit.framework.TestCase; */ public class TestLruBlockCache extends TestCase { + public void testConfiguration() { + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(1019)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(1000023)); + + LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.93f).acceptableFactor(0.97f).singleFactor(0.20f) + .multiFactor(0.30f).memoryFactor(0.50f).mapConcurrencyLevel(5).buildMap().forEach(cc::set); + + String defaultPrefix = BlockCacheConfiguration.getDefaultPrefix(LruBlockCacheConfiguration.PROPERTY_PREFIX); + + // this should be overridden by cache type specific setting + cc.set(defaultPrefix + LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, "0.6"); + + // this is not set for the cache type, so should fall back to default + cc.set(defaultPrefix + LruBlockCacheConfiguration.MAP_LOAD_PROPERTY, "0.53"); + + LruBlockCacheConfiguration lbcc = new LruBlockCacheConfiguration(cc, CacheType.INDEX); + + assertEquals(false, lbcc.isUseEvictionThread()); + assertEquals(0.93f, lbcc.getMinFactor()); + assertEquals(0.97f, lbcc.getAcceptableFactor()); + assertEquals(0.20f, lbcc.getSingleFactor()); + assertEquals(0.30f, lbcc.getMultiFactor()); + assertEquals(0.50f, lbcc.getMemoryFactor()); + assertEquals(0.53f, lbcc.getMapLoadFactor()); + assertEquals(5, lbcc.getMapConcurrencyLevel()); + assertEquals(1019, lbcc.getBlockSize()); + assertEquals(1000023, lbcc.getMaxSize()); + } + public void testBackgroundEvictionThread() throws Exception { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 9); // room for 9, will evict - LruBlockCache cache = new LruBlockCache(maxSize, blockSize); + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); + manager.start(cc); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -53,6 +98,8 @@ public class TestLruBlockCache extends TestCase { } // A single eviction run should have occurred assertEquals(cache.getEvictionCount(), 1); + + manager.stop(); } public void testCacheSimple() throws Exception { @@ -60,7 +107,14 @@ public class TestLruBlockCache extends TestCase { long maxSize = 1000000; long blockSize = calculateBlockSizeDefault(maxSize, 101); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize); + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); + manager.start(cc); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] blocks = generateRandomBlocks(100, blockSize); @@ -102,6 +156,7 @@ public class TestLruBlockCache extends TestCase { // Thread t = new LruBlockCache.StatisticsThread(cache); // t.start(); // t.join(); + manager.stop(); } public void testCacheEvictionSimple() throws Exception { @@ -109,7 +164,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false); + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); + LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).buildMap().forEach(cc::set); + manager.start(cc); + + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -125,13 +189,13 @@ public class TestLruBlockCache extends TestCase { assertEquals(1, cache.getEvictionCount()); // Our expected size overruns acceptable limit - assertTrue(expectedCacheSize > (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + assertTrue(expectedCacheSize > (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR)); // But the cache did not grow beyond max assertTrue(cache.heapSize() < maxSize); // And is still below the acceptable limit - assertTrue(cache.heapSize() < (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + assertTrue(cache.heapSize() < (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR)); // All blocks except block 0 and 1 should be in the cache assertTrue(cache.getBlock(blocks[0].blockName) == null); @@ -139,6 +203,7 @@ public class TestLruBlockCache extends TestCase { for (int i = 2; i < blocks.length; i++) { assertTrue(Arrays.equals(cache.getBlock(blocks[i].blockName).getBuffer(), blocks[i].buf)); } + manager.stop(); } public void testCacheEvictionTwoPriorities() throws Exception { @@ -146,12 +211,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min - 0.99f, // acceptable - 0.25f, // single - 0.50f, // multi - 0.25f);// memory + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); + LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.25f) + .multiFactor(0.50f).memoryFactor(0.25f).buildMap().forEach(cc::set); + manager.start(cc); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(5, 10000, "single"); Block[] multiBlocks = generateFixedBlocks(5, 10000, "multi"); @@ -178,13 +247,13 @@ public class TestLruBlockCache extends TestCase { assertEquals(cache.getEvictedCount(), 2); // Our expected size overruns acceptable limit - assertTrue(expectedCacheSize > (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + assertTrue(expectedCacheSize > (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR)); // But the cache did not grow beyond max assertTrue(cache.heapSize() <= maxSize); // And is now below the acceptable limit - assertTrue(cache.heapSize() <= (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + assertTrue(cache.heapSize() <= (maxSize * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR)); // We expect fairness across the two priorities. // This test makes multi go barely over its limit, in-memory @@ -198,6 +267,7 @@ public class TestLruBlockCache extends TestCase { assertTrue(Arrays.equals(cache.getBlock(singleBlocks[i].blockName).getBuffer(), singleBlocks[i].buf)); assertTrue(Arrays.equals(cache.getBlock(multiBlocks[i].blockName).getBuffer(), multiBlocks[i].buf)); } + manager.stop(); } public void testCacheEvictionThreePriorities() throws Exception { @@ -205,12 +275,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSize(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min - 0.99f, // acceptable - 0.33f, // single - 0.33f, // multi - 0.34f);// memory + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); + LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.33f) + .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set); + manager.start(cc); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(5, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -315,6 +389,7 @@ public class TestLruBlockCache extends TestCase { assertEquals(null, cache.getBlock(memoryBlocks[2].blockName)); assertEquals(null, cache.getBlock(memoryBlocks[3].blockName)); + manager.stop(); } // test scan resistance @@ -323,12 +398,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSize(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.66f, // min - 0.99f, // acceptable - 0.33f, // single - 0.33f, // multi - 0.34f);// memory + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); + LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.66f).acceptableFactor(0.99f).singleFactor(0.33f) + .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set); + manager.start(cc); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(20, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -374,64 +453,7 @@ public class TestLruBlockCache extends TestCase { // Should now have 7 total blocks assertEquals(7, cache.size()); - } - - // test setMaxSize - public void testResizeBlockCache() throws Exception { - - long maxSize = 300000; - long blockSize = calculateBlockSize(maxSize, 31); - - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min - 0.99f, // acceptable - 0.33f, // single - 0.33f, // multi - 0.34f);// memory - - Block[] singleBlocks = generateFixedBlocks(10, blockSize, "single"); - Block[] multiBlocks = generateFixedBlocks(10, blockSize, "multi"); - Block[] memoryBlocks = generateFixedBlocks(10, blockSize, "memory"); - - // Add all blocks from all priorities - for (int i = 0; i < 10; i++) { - - // Just add single blocks - cache.cacheBlock(singleBlocks[i].blockName, singleBlocks[i].buf); - - // Add and get multi blocks - cache.cacheBlock(multiBlocks[i].blockName, multiBlocks[i].buf); - cache.getBlock(multiBlocks[i].blockName); - - // Add memory blocks as such - cache.cacheBlock(memoryBlocks[i].blockName, memoryBlocks[i].buf, true); - } - - // Do not expect any evictions yet - assertEquals(0, cache.getEvictionCount()); - - // Resize to half capacity plus an extra block (otherwise we evict an extra) - cache.setMaxSize((long) (maxSize * 0.5f)); - - // Should have run a single eviction - assertEquals(1, cache.getEvictionCount()); - - // And we expect 1/2 of the blocks to be evicted - assertEquals(15, cache.getEvictedCount()); - - // And the oldest 5 blocks from each category should be gone - for (int i = 0; i < 5; i++) { - assertEquals(null, cache.getBlock(singleBlocks[i].blockName)); - assertEquals(null, cache.getBlock(multiBlocks[i].blockName)); - assertEquals(null, cache.getBlock(memoryBlocks[i].blockName)); - } - - // And the newest 5 blocks should still be accessible - for (int i = 5; i < 10; i++) { - assertTrue(Arrays.equals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName).getBuffer())); - assertTrue(Arrays.equals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName).getBuffer())); - assertTrue(Arrays.equals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName).getBuffer())); - } + manager.stop(); } private Block[] generateFixedBlocks(int numBlocks, int size, String pfx) { @@ -459,7 +481,7 @@ public class TestLruBlockCache extends TestCase { long roughBlockSize = maxSize / numBlocks; int numEntries = (int) Math.ceil((1.2) * maxSize / roughBlockSize); long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) - + (LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT); + + (LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT); long negateBlockSize = totalOverhead / numEntries; negateBlockSize += CachedBlock.PER_BLOCK_OVERHEAD; return ClassSize.align((long) Math.floor((roughBlockSize - negateBlockSize) * 0.99f)); @@ -469,10 +491,10 @@ public class TestLruBlockCache extends TestCase { long roughBlockSize = maxSize / numBlocks; int numEntries = (int) Math.ceil((1.2) * maxSize / roughBlockSize); long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) - + (LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT); + + (LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT); long negateBlockSize = totalOverhead / numEntries; negateBlockSize += CachedBlock.PER_BLOCK_OVERHEAD; - return ClassSize.align((long) Math.floor((roughBlockSize - negateBlockSize) * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + return ClassSize.align((long) Math.floor((roughBlockSize - negateBlockSize) * LruBlockCacheConfiguration.DEFAULT_ACCEPTABLE_FACTOR)); } private static class Block implements HeapSize { http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 11ded64..ef3fb62 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -58,7 +58,10 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.accumulo.core.file.streams.PositionedOutputs; @@ -206,6 +209,7 @@ public class RFileTest { protected AccumuloConfiguration accumuloConfiguration; public Reader reader; public SortedKeyValueIterator<Key,Value> iter; + private BlockCacheManager manager; public TestRFile(AccumuloConfiguration accumuloConfiguration) { this.accumuloConfiguration = accumuloConfiguration; @@ -265,8 +269,20 @@ public class RFileTest { in = new FSDataInputStream(bais); fileLength = data.length; - LruBlockCache indexCache = new LruBlockCache(100000000, 100000); - LruBlockCache dataCache = new LruBlockCache(100000000, 100000); + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); + try { + manager = BlockCacheManager.getInstance(cc); + } catch (Exception e) { + throw new RuntimeException("Error creating BlockCacheManager", e); + } + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); + cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); + manager.start(cc); + LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); + LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA); CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, DefaultConfiguration.getInstance()); reader = new RFile.Reader(_cbr); @@ -279,6 +295,9 @@ public class RFileTest { public void closeReader() throws IOException { reader.close(); in.close(); + if (null != manager) { + manager.stop(); + } } public void seek(Key nk) throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index d10ff7b..91b748c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -40,8 +40,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; -import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; -import org.apache.accumulo.core.file.blockfile.cache.TinyLfuBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.NamingThreadFactory; @@ -99,6 +99,7 @@ public class TabletServerResourceManager { private final MemoryManagementFramework memMgmt; + private final BlockCacheManager cacheManager; private final BlockCache _dCache; private final BlockCache _iCache; private final BlockCache _sCache; @@ -169,25 +170,24 @@ public class TabletServerResourceManager { long maxMemory = acuConf.getAsBytes(Property.TSERV_MAXMEM); boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) && NativeMap.isLoaded(); - long blockSize = acuConf.getAsBytes(Property.TSERV_DEFAULT_BLOCKSIZE); - long dCacheSize = acuConf.getAsBytes(Property.TSERV_DATACACHE_SIZE); - long iCacheSize = acuConf.getAsBytes(Property.TSERV_INDEXCACHE_SIZE); - long sCacheSize = acuConf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE); long totalQueueSize = acuConf.getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); - String policy = acuConf.get(Property.TSERV_CACHE_POLICY); - if (policy.equalsIgnoreCase("LRU")) { - _iCache = new LruBlockCache(iCacheSize, blockSize); - _dCache = new LruBlockCache(dCacheSize, blockSize); - _sCache = new LruBlockCache(sCacheSize, blockSize); - } else if (policy.equalsIgnoreCase("TinyLFU")) { - _iCache = new TinyLfuBlockCache(iCacheSize, blockSize); - _dCache = new TinyLfuBlockCache(dCacheSize, blockSize); - _sCache = new TinyLfuBlockCache(sCacheSize, blockSize); - } else { - throw new IllegalArgumentException("Unknown Block cache policy " + policy); + try { + cacheManager = BlockCacheManager.getInstance(acuConf); + } catch (Exception e) { + throw new RuntimeException("Error creating BlockCacheManager", e); } + cacheManager.start(acuConf); + + _iCache = cacheManager.getBlockCache(CacheType.INDEX); + _dCache = cacheManager.getBlockCache(CacheType.DATA); + _sCache = cacheManager.getBlockCache(CacheType.SUMMARY); + + long dCacheSize = _dCache.getMaxHeapSize(); + long iCacheSize = _iCache.getMaxHeapSize(); + long sCacheSize = _sCache.getMaxHeapSize(); + Runtime runtime = Runtime.getRuntime(); if (usingNativeMap) { // Still check block cache sizes when using native maps. @@ -543,6 +543,14 @@ public class TabletServerResourceManager { executorService.shutdown(); } + if (null != this.cacheManager) { + try { + this.cacheManager.stop(); + } catch (Exception ex) { + log.error("Error stopping BlockCacheManager", ex); + } + } + for (Entry<String,ExecutorService> entry : threadPools.entrySet()) { while (true) { try {