Repository: ignite Updated Branches: refs/heads/ignite-5075 68e799c8e -> a5f59f09a
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5f59f09 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5f59f09 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5f59f09 Branch: refs/heads/ignite-5075 Commit: a5f59f09a4848d3e781066a25b65a698763280a1 Parents: 68e799c Author: sboikov <[email protected]> Authored: Tue May 16 17:11:24 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 16 17:11:24 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheGroupsTest.java | 111 ++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a5f59f09/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 20ac80c..601ebed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -18,8 +18,12 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; @@ -706,6 +710,110 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testConcurrentOperationsSameKeys() throws Exception { + final int SRVS = 4; + final int CLIENTS = 4; + final int NODES = SRVS + CLIENTS; + + startGrid(0); + + Ignite srv0 = startGridsMultiThreaded(1, SRVS - 1); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + srv0.createCache(cacheConfiguration(GROUP1, "a0", PARTITIONED, ATOMIC, 1, false)); + srv0.createCache(cacheConfiguration(GROUP1, "a1", PARTITIONED, ATOMIC, 1, false)); + srv0.createCache(cacheConfiguration(GROUP1, "t0", PARTITIONED, TRANSACTIONAL, 1, false)); + srv0.createCache(cacheConfiguration(GROUP1, "t1", PARTITIONED, TRANSACTIONAL, 1, false)); + + final List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 50; i++) + keys.add(i); + + final AtomicBoolean err = new AtomicBoolean(); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture fut1 = updateFuture(NODES, "a0", keys, false, stop, err); + IgniteInternalFuture fut2 = updateFuture(NODES, "a1", keys, true, stop, err); + IgniteInternalFuture fut3 = updateFuture(NODES, "t0", keys, false, stop, err); + IgniteInternalFuture fut4 = updateFuture(NODES, "t1", keys, true, stop, err); + + try { + for (int i = 0; i < 15 && !stop.get(); i++) + U.sleep(1_000); + } + finally { + stop.set(true); + } + + fut1.get(); + fut2.get(); + fut3.get(); + fut4.get(); + + assertFalse("Unexpected error, see log for details", err.get()); + } + + /** + * @param nodes Total number of nodes. + * @param cacheName Cache name. + * @param keys Keys to update. + * @param reverse {@code True} if update in reverse order. + * @param stop Stop flag. + * @param err Error flag. + * @return Update future. + */ + private IgniteInternalFuture updateFuture(final int nodes, + final String cacheName, + final List<Integer> keys, + final boolean reverse, + final AtomicBoolean stop, + final AtomicBoolean err) { + final AtomicInteger idx = new AtomicInteger(); + + return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + Ignite node = ignite(idx.getAndIncrement() % nodes); + + log.info("Start thread [node=" + node.name() + ']'); + + IgniteCache cache = node.cache(cacheName); + + Map<Integer, Integer> map = new LinkedHashMap<>(); + + if (reverse) { + for (int i = keys.size() - 1; i >= 0; i--) + map.put(keys.get(i), 2); + } + else { + for (Integer key : keys) + map.put(key, 1); + } + + while (!stop.get()) + cache.putAll(map); + } + catch (Exception e) { + err.set(true); + + log.error("Unexpected error: " + e, e); + + stop.set(true); + } + + return null; + } + }, nodes * 2, "update-" + cacheName + "-" + reverse); + } + + /** + * @throws Exception If failed. + */ public void testConcurrentOperationsAndCacheDestroy() throws Exception { final int SRVS = 4; final int CLIENTS = 4; @@ -818,7 +926,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { }, "cache-destroy-thread"); try { - U.sleep(30_000); + for (int i = 0; i < 30 && !stop.get(); i++) + U.sleep(1_000); } finally { stop.set(true);
