IGNITE-1681: loadAll threshold is not configurable for CacheStoreBalancingWrapper
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/967cfcbb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/967cfcbb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/967cfcbb Branch: refs/heads/ignite-1816 Commit: 967cfcbb5b87e172a48e619b18e3988f4ef2e428 Parents: 92881e0 Author: Michael Griggs <endian...@gmail.com> Authored: Wed Nov 11 13:45:48 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Nov 11 13:45:48 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 37 ++++++++++++++ .../cache/CacheStoreBalancingWrapper.java | 5 +- .../store/GridCacheStoreManagerAdapter.java | 3 +- .../store/GridCacheBalancingStoreSelfTest.java | 53 +++++++++++++++++--- 4 files changed, 88 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index b7276c9..1b8d41c 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -174,9 +174,15 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default size for onheap SQL row cache size. */ public static final int DFLT_SQL_ONHEAP_ROW_CACHE_SIZE = 10 * 1024; + /** Default threshold for concurrent loading of keys from {@link CacheStore}. */ + public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5; + /** Cache name. */ private String name; + /** Threshold for concurrent loading of keys from {@link CacheStore}. */ + private int storeConcurrentLoadAllThreshold = DFLT_CONCURRENT_LOAD_ALL_THRESHOLD; + /** Rebalance thread pool size. */ @Deprecated private int rebalancePoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE; @@ -834,6 +840,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * Gets the threshold used in cases when values for multiple keys are being loaded from an underlying + * {@link CacheStore} in parallel. In the situation when several threads load the same or intersecting set of keys + * and the total number of keys to load is less or equal to this threshold then there will be no a second call to + * the storage in order to load a key from thread A if the same key is already being loaded by thread B. + * + * The threshold should be controlled wisely. On the one hand if it's set to a big value then the interaction with + * a storage during the load of missing keys will be minimal. On the other hand the big value may result in + * significant performance degradation because it is needed to check for every key whether it's being loaded or not. + * + * When not set, default value is {@link #DFLT_CONCURRENT_LOAD_ALL_THRESHOLD}. + * + * @return The concurrent load-all threshold. + */ + public int getStoreConcurrentLoadAllThreshold() { + return storeConcurrentLoadAllThreshold; + } + + /** + * Sets the concurrent load-all threshold used for cases when keys' values are being loaded from {@link CacheStore} + * in parallel. + * + * @param storeConcurrentLoadAllThreshold The concurrent load-all threshold. + * @return {@code this} for chaining. + */ + public CacheConfiguration<K, V> setStoreConcurrentLoadAllThreshold(int storeConcurrentLoadAllThreshold) { + this.storeConcurrentLoadAllThreshold = storeConcurrentLoadAllThreshold; + + return this; + } + + /** * Gets key topology resolver to provide mapping from keys to nodes. * * @return Key topology resolver to provide mapping from keys to nodes. http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java index 93075f3..8992326 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java @@ -28,6 +28,7 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiInClosure; @@ -39,7 +40,7 @@ import org.jsr166.ConcurrentHashMap8; */ public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> { /** */ - public static final int DFLT_LOAD_ALL_THRESHOLD = 5; + public static final int DFLT_LOAD_ALL_THRESHOLD = CacheConfiguration.DFLT_CONCURRENT_LOAD_ALL_THRESHOLD; /** Delegate store. */ private CacheStore<K, V> delegate; @@ -306,4 +307,4 @@ public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> { return get().get(key); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index dd54da5..6bfafd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -112,7 +112,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt store = cacheStoreWrapper(ctx, cfgStore, cfg); - singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store); + singleThreadGate = store == null ? null : new CacheStoreBalancingWrapper<>(store, + cfg.getStoreConcurrentLoadAllThreshold()); ThreadLocal<SessionData> sesHolder0 = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java index 1e3e4b4..bfbb08c 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.cache.Cache; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper; @@ -127,15 +128,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConcurrentLoad() throws Exception { - int threads = 5; + CacheConfiguration cfg = new CacheConfiguration(); - final int keys = 50; + assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold()); + doTestConcurrentLoad(5, 50, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentLoadCustomThreshold() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setStoreConcurrentLoadAllThreshold(15); + + assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold()); + + doTestConcurrentLoad(5, 50, cfg.getStoreConcurrentLoadAllThreshold()); + } + + /** + * @throws Exception If failed. + */ + private void doTestConcurrentLoad(int threads, final int keys, int threshold) throws Exception { final CyclicBarrier beforeBarrier = new CyclicBarrier(threads); ConcurrentVerifyStore store = new ConcurrentVerifyStore(keys); - final CacheStoreBalancingWrapper<Integer, Integer> wrapper =new CacheStoreBalancingWrapper<>(store); + final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store, threshold); GridTestUtils.runMultiThreaded(new Runnable() { @Override public void run() { @@ -159,17 +180,35 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConcurrentLoadAll() throws Exception { - int threads = 5; + CacheConfiguration cfg = new CacheConfiguration(); - final int threshold = 5; + assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, cfg.getStoreConcurrentLoadAllThreshold()); - final int keysCnt = 100; + doTestConcurrentLoadAll(5, CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, 150); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentLoadAllCustomThreshold() throws Exception { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setStoreConcurrentLoadAllThreshold(15); + assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold()); + + doTestConcurrentLoadAll(5, cfg.getStoreConcurrentLoadAllThreshold(), 150); + } + + /** + * @throws Exception If failed. + */ + private void doTestConcurrentLoadAll(int threads, final int threshold, final int keysCnt) throws Exception { final CyclicBarrier beforeBarrier = new CyclicBarrier(threads); ConcurrentVerifyStore store = new ConcurrentVerifyStore(keysCnt); - final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store); + final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store, threshold); GridTestUtils.runMultiThreaded(new Runnable() { @Override public void run() {