IGNITE-1239 - Added test for reopened ticket.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/866fb415 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/866fb415 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/866fb415 Branch: refs/heads/ignite-843 Commit: 866fb41525957555231fca11c5853731b9473170 Parents: 06fdd7d Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Mon Sep 14 16:09:37 2015 -0700 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Sep 14 16:09:37 2015 -0700 ---------------------------------------------------------------------- ...CacheScanPartitionQueryFallbackSelfTest.java | 105 ++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/866fb415/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index cb3a3bf..df310b4 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -26,13 +26,19 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -48,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; @@ -67,7 +74,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT private static final int GRID_CNT = 3; /** Keys count. */ - private static final int KEYS_CNT = 5000; + private static final int KEYS_CNT = 50 * RendezvousAffinityFunction.DFLT_PARTITION_COUNT; /** Ip finder. */ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -261,6 +268,79 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT } /** + * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with + * scan query. + * + * @throws Exception In case of error. + */ + public void testScanFallbackOnRebalancingCursor() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1239"); + + cacheMode = CacheMode.PARTITIONED; + clientMode = false; + backups = 1; + commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory(); + + try { + Ignite ignite = startGrids(GRID_CNT); + + fillCache(ignite); + + final AtomicBoolean done = new AtomicBoolean(false); + + IgniteInternalFuture fut1 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 5; i++) { + startGrid(GRID_CNT + i); + + U.sleep(500); + } + + done.set(true); + + return null; + } + }, 1); + + final AtomicInteger nodeIdx = new AtomicInteger(); + + IgniteInternalFuture fut2 = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + int nodeId = nodeIdx.getAndIncrement(); + + IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null); + + while (!done.get()) { + int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions()); + + try { + QueryCursor<Cache.Entry<Integer, Integer>> cur = + cache.query(new ScanQuery<Integer, Integer>(part)); + + U.debug(log, "Running query [node=" + nodeId + ", part=" + part + ']'); + + doTestScanQueryCursor(cur, part); + } + catch (ClusterGroupEmptyCheckedException e) { + log.warning("Invalid partition: " + part, e); + } + } + + return null; + } + }, GRID_CNT); + + fut1.get(); + fut2.get(); + } + finally { + stopAllGrids(); + } + } + + /** * Scan should try first remote node and fallbacks to second remote node. * * @throws Exception If failed. @@ -391,6 +471,29 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT } /** + * @param cur Query cursor. + * @param part Partition number. + */ + protected void doTestScanQueryCursor( + QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) throws IgniteCheckedException { + + Map<Integer, Integer> map = entries.get(part); + + assert map != null; + + int cnt = 0; + + for (Cache.Entry<Integer, Integer> e : cur) { + + assertEquals(map.get(e.getKey()), e.getValue()); + + cnt++; + } + + assertEquals("Invalid number of entries for partition: " + part, map.size(), cnt); + } + + /** * @param cctx Cctx. */ private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {