Repository: ignite Updated Branches: refs/heads/master 4ae61ce9e -> 40ae36441
IGNITE-802: reworked GridCachePartitionedQueueEntryMoveSelfTest.testQueue Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ec5c795a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ec5c795a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ec5c795a Branch: refs/heads/master Commit: ec5c795aa523cc48c292cfb09e422edcd8a1a42b Parents: d96e0d2 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Sep 10 12:18:44 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Sep 10 12:18:44 2015 +0300 ---------------------------------------------------------------------- ...dCachePartitionedQueueEntryMoveSelfTest.java | 191 +++++++------------ 1 file changed, 66 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ec5c795a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java index 4d92b88..1d225a6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.datastructures.partitioned; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; @@ -30,18 +29,15 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CollectionConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -52,11 +48,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; * Cache queue test with changing topology. */ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollectionAbstractTest { - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-802"); - } - /** Queue capacity. */ private static final int QUEUE_CAP = 5; @@ -66,9 +57,6 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection /** Backups count. */ private static final int BACKUP_CNT = 1; - /** Node ID to set manually on node startup. */ - private UUID nodeId; - /** {@inheritDoc} */ @Override protected int gridCount() { return GRID_CNT; @@ -98,116 +86,93 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection return colCfg; } - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - if (nodeId != null) { - cfg.setNodeId(nodeId); - - nodeId = null; - } - - return cfg; - } - /** * @throws Exception If failed. */ public void testQueue() throws Exception { - try { - startGrids(GRID_CNT); - - final String queueName = "queue-name-" + UUID.randomUUID(); + final String queueName = "queue-test-name"; - System.out.println(U.filler(20, '\n')); + System.out.println(U.filler(20, '\n')); - final CountDownLatch latch1 = new CountDownLatch(1); - //final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); - IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() { - Ignite ignite = grid(0); + IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws IgniteInterruptedCheckedException { + Ignite ignite = grid(0); - IgniteQueue<Integer> queue = ignite.queue(queueName, - QUEUE_CAP, - config(true)); + IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true)); - for (int i = 0; i < QUEUE_CAP * 2; i++) { - if (i == QUEUE_CAP) { - latch1.countDown(); + for (int i = 0; i < QUEUE_CAP * 2; i++) { + if (i == QUEUE_CAP) { + latch1.countDown(); - //U.await(latch2); - } - - try { - info(">>> Putting value: " + i); + U.await(latch2); + } - queue.put(i); + try { + info(">>> Putting value: " + i); - info(">>> Value is in queue: " + i); - } - catch (Error | RuntimeException e) { - error("Failed to put value: " + i, e); + queue.put(i); - throw e; - } + info(">>> Value is in queue: " + i); } + catch (Error | RuntimeException e) { + error("Failed to put value: " + i, e); - return null; + throw e; + } } - }); - latch1.await(); + return null; + } + }); - startAdditionalNodes(BACKUP_CNT + 2, queueName); + latch1.await(); - System.out.println(U.filler(20, '\n')); + startAdditionalNodes(BACKUP_CNT + 2, queueName); - //latch2.countDown(); + System.out.println(U.filler(20, '\n')); - IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws IgniteCheckedException { - Ignite ignite = grid(GRID_CNT); + latch2.countDown(); - IgniteQueue<Integer> queue = ignite.queue(queueName, Integer.MAX_VALUE, config(true)); + IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws IgniteCheckedException { + Ignite ignite = grid(GRID_CNT); - int cnt = 0; + IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true)); - do { - try { - Integer i = queue.poll(); + int cnt = 0; - if (i != null) { - info(">>> Polled value: " + cnt); + do { + try { + Integer i = queue.poll(); - cnt++; - } - else { - info(">>> Waiting for value..."); + if (i != null) { + info(">>> Polled value: " + cnt); - U.sleep(2000); - } + cnt++; } - catch (Error | RuntimeException e) { - error("Failed to poll value.", e); + else { + info(">>> Waiting for value..."); - throw e; + U.sleep(2000); } } - while (cnt < QUEUE_CAP * 2); + catch (Error | RuntimeException e) { + error("Failed to poll value.", e); - return null; + throw e; + } } - }); + while (cnt < QUEUE_CAP * 2); - fut1.get(); - fut2.get(); - } - finally { - stopAllGrids(); - } + return null; + } + }); + + fut1.get(); + fut2.get(); } /** @@ -218,51 +183,27 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection * @throws Exception If failed. */ private void startAdditionalNodes(int cnt, String queueName) throws Exception { - AffinityFunction aff = jcache(0).getConfiguration(CacheConfiguration.class).getAffinity(); - AffinityKeyMapper mapper = jcache(0).getConfiguration(CacheConfiguration.class).getAffinityMapper(); - - assertNotNull(aff); - assertNotNull(mapper); - - int part = aff.partition(mapper.affinityKey(queueName)); + IgniteQueue queue = ignite(0).queue(queueName, 0, null); - Collection<ClusterNode> nodes = grid(0).cluster().nodes(); + CacheConfiguration cCfg = getQueueCache(queue); - Collection<ClusterNode> aff0 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName); - Collection<ClusterNode> aff1 = nodes(aff, part, nodes); + Collection<ClusterNode> aff1 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName); - assertEquals(new ArrayList<>(aff0), new ArrayList<>(aff1)); + for (int i = 0, id = GRID_CNT; i < cnt; i++) { + startGrid(id++); - Collection<ClusterNode> aff2; - Collection<ClusterNode> tmpNodes; + awaitPartitionMapExchange(); - int retries = 10000; + Collection<ClusterNode> aff2 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName); - do { - tmpNodes = new ArrayList<>(cnt); + if (!aff1.iterator().next().equals(aff2.iterator().next())) { + info("Moved queue to new primary node [oldAff=" + aff1 + ", newAff=" + aff2 + ']'); - for (int i = 0; i < cnt; i++) - tmpNodes.add(new GridTestNode(UUID.randomUUID())); - - aff2 = nodes(aff, part, F.concat(true, tmpNodes, nodes)); - - if (retries-- < 0) - throw new IgniteCheckedException("Failed to find node IDs to change current affinity mapping."); + return; + } } - while (F.containsAny(aff1, aff2)); - - int i = GRID_CNT; - - // Start several additional grids. - for (UUID id : F.nodeIds(tmpNodes)) { - nodeId = id; - - startGrid(i++); - } - - aff2 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName); - assertFalse("Unexpected affinity [aff1=" + aff1 + ", aff2=" + aff2 + ']', F.containsAny(aff1, aff2)); + throw new IgniteCheckedException("Unable to move the queue to a new primary node"); } /**