IGNITE-1681: loadAll threshold is not configurable for 
CacheStoreBalancingWrapper


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/967cfcbb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/967cfcbb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/967cfcbb

Branch: refs/heads/ignite-1816
Commit: 967cfcbb5b87e172a48e619b18e3988f4ef2e428
Parents: 92881e0
Author: Michael Griggs <endian...@gmail.com>
Authored: Wed Nov 11 13:45:48 2015 +0300
Committer: Denis Magda <dma...@gridgain.com>
Committed: Wed Nov 11 13:45:48 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       | 37 ++++++++++++++
 .../cache/CacheStoreBalancingWrapper.java       |  5 +-
 .../store/GridCacheStoreManagerAdapter.java     |  3 +-
 .../store/GridCacheBalancingStoreSelfTest.java  | 53 +++++++++++++++++---
 4 files changed, 88 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index b7276c9..1b8d41c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -174,9 +174,15 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
     /** Default size for onheap SQL row cache size. */
     public static final int DFLT_SQL_ONHEAP_ROW_CACHE_SIZE = 10 * 1024;
 
+    /** Default threshold for concurrent loading of keys from {@link 
CacheStore}. */
+    public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5;
+
     /** Cache name. */
     private String name;
 
+    /** Threshold for concurrent loading of keys from {@link CacheStore}. */
+    private int storeConcurrentLoadAllThreshold = 
DFLT_CONCURRENT_LOAD_ALL_THRESHOLD;
+
     /** Rebalance thread pool size. */
     @Deprecated
     private int rebalancePoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
@@ -834,6 +840,37 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
     }
 
     /**
+     * Gets the threshold used in cases when values for multiple keys are 
being loaded from an underlying
+     * {@link CacheStore} in parallel. In the situation when several threads 
load the same or intersecting set of keys
+     * and the total number of keys to load is less or equal to this threshold 
then there will be no a second call to
+     * the storage in order to load a key from thread A if the same key is 
already being loaded by thread B.
+     *
+     * The threshold should be controlled wisely. On the one hand if it's set 
to a big value then the interaction with
+     * a storage during the load of missing keys will be minimal. On the other 
hand the big value may result in
+     * significant performance degradation because it is needed to check for 
every key whether it's being loaded or not.
+     *
+     * When not set, default value is {@link 
#DFLT_CONCURRENT_LOAD_ALL_THRESHOLD}.
+     *
+     * @return The concurrent load-all threshold.
+     */
+    public int getStoreConcurrentLoadAllThreshold() {
+        return storeConcurrentLoadAllThreshold;
+    }
+
+    /**
+     * Sets the concurrent load-all threshold used for cases when keys' values 
are being loaded from {@link CacheStore}
+     * in parallel.
+     *
+     * @param storeConcurrentLoadAllThreshold The concurrent load-all 
threshold.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setStoreConcurrentLoadAllThreshold(int 
storeConcurrentLoadAllThreshold) {
+        this.storeConcurrentLoadAllThreshold = storeConcurrentLoadAllThreshold;
+
+        return this;
+    }
+
+    /**
      * Gets key topology resolver to provide mapping from keys to nodes.
      *
      * @return Key topology resolver to provide mapping from keys to nodes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
index 93075f3..8992326 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStoreBalancingWrapper.java
@@ -28,6 +28,7 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -39,7 +40,7 @@ import org.jsr166.ConcurrentHashMap8;
  */
 public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
     /** */
-    public static final int DFLT_LOAD_ALL_THRESHOLD = 5;
+    public static final int DFLT_LOAD_ALL_THRESHOLD = 
CacheConfiguration.DFLT_CONCURRENT_LOAD_ALL_THRESHOLD;
 
     /** Delegate store. */
     private CacheStore<K, V> delegate;
@@ -306,4 +307,4 @@ public class CacheStoreBalancingWrapper<K, V> implements 
CacheStore<K, V> {
             return get().get(key);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index dd54da5..6bfafd4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -112,7 +112,8 @@ public abstract class GridCacheStoreManagerAdapter extends 
GridCacheManagerAdapt
 
         store = cacheStoreWrapper(ctx, cfgStore, cfg);
 
-        singleThreadGate = store == null ? null : new 
CacheStoreBalancingWrapper<>(store);
+        singleThreadGate = store == null ? null : new 
CacheStoreBalancingWrapper<>(store,
+            cfg.getStoreConcurrentLoadAllThreshold());
 
         ThreadLocal<SessionData> sesHolder0 = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/967cfcbb/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
index 1e3e4b4..bfbb08c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.Cache;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper;
@@ -127,15 +128,35 @@ public class GridCacheBalancingStoreSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testConcurrentLoad() throws Exception {
-        int threads = 5;
+        CacheConfiguration cfg = new CacheConfiguration();
 
-        final int keys = 50;
+        assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, 
cfg.getStoreConcurrentLoadAllThreshold());
 
+        doTestConcurrentLoad(5, 50, 
CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentLoadCustomThreshold() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setStoreConcurrentLoadAllThreshold(15);
+
+        assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold());
+
+        doTestConcurrentLoad(5, 50, cfg.getStoreConcurrentLoadAllThreshold());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestConcurrentLoad(int threads, final int keys, int 
threshold) throws Exception {
         final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
 
         ConcurrentVerifyStore store = new ConcurrentVerifyStore(keys);
 
-        final CacheStoreBalancingWrapper<Integer, Integer> wrapper =new 
CacheStoreBalancingWrapper<>(store);
+        final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new 
CacheStoreBalancingWrapper<>(store, threshold);
 
         GridTestUtils.runMultiThreaded(new Runnable() {
             @Override public void run() {
@@ -159,17 +180,35 @@ public class GridCacheBalancingStoreSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testConcurrentLoadAll() throws Exception {
-        int threads = 5;
+        CacheConfiguration cfg = new CacheConfiguration();
 
-        final int threshold = 5;
+        assertEquals(CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, 
cfg.getStoreConcurrentLoadAllThreshold());
 
-        final int keysCnt = 100;
+        doTestConcurrentLoadAll(5, 
CacheStoreBalancingWrapper.DFLT_LOAD_ALL_THRESHOLD, 150);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentLoadAllCustomThreshold() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setStoreConcurrentLoadAllThreshold(15);
 
+        assertEquals(15, cfg.getStoreConcurrentLoadAllThreshold());
+
+        doTestConcurrentLoadAll(5, cfg.getStoreConcurrentLoadAllThreshold(), 
150);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestConcurrentLoadAll(int threads, final int threshold, 
final int keysCnt) throws Exception {
         final CyclicBarrier beforeBarrier = new CyclicBarrier(threads);
 
         ConcurrentVerifyStore store = new ConcurrentVerifyStore(keysCnt);
 
-        final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new 
CacheStoreBalancingWrapper<>(store);
+        final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new 
CacheStoreBalancingWrapper<>(store, threshold);
 
         GridTestUtils.runMultiThreaded(new Runnable() {
             @Override public void run() {

Reply via email to