IgniteCacheAtomicProtocolTest fix - Fixes #1839. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f382afb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f382afb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f382afb Branch: refs/heads/ignite-5024 Commit: 1f382afba6028bcb4463c3926e2d4e7ba6ffc590 Parents: 3eb52a8 Author: Konstantin Dudkov <[email protected]> Authored: Fri Apr 21 11:47:31 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Apr 21 11:47:31 2017 +0300 ---------------------------------------------------------------------- .../atomic/IgniteCacheAtomicProtocolTest.java | 175 +++++++++++++++++-- 1 file changed, 156 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1f382afb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index 29d67e2..5a6b1c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -17,11 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; @@ -29,12 +32,18 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -44,11 +53,13 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; @@ -357,7 +368,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_ASYNC)); - List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 1); + List<Integer> keys = getKeysMoved(srv0, TEST_CACHE, putAll ? 3 : 1); testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name()); testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name()); @@ -372,30 +383,18 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { else nearCache.put(keys.get(0), map.get(keys.get(0))); - int nodeIdx = 2; - Affinity<Object> aff = clientNode.affinity(TEST_CACHE); - int keysMoved; - - do { - startGrid(nodeIdx); - - awaitPartitionMapExchange(); - - keysMoved = 0; + startGrid(2); - for (Integer key : keys) { - if (!aff.isPrimary(srv0.cluster().localNode(), key)) - keysMoved++; - } + awaitPartitionMapExchange(); - if (keysMoved == keys.size()) - break; + int keysMoved = 0; - nodeIdx++; + for (Integer key : keys) { + if (!aff.isPrimary(srv0.cluster().localNode(), key)) + keysMoved++; } - while (nodeIdx < 10); assertEquals(keys.size(), keysMoved); @@ -826,6 +825,68 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { } /** + * Return list of keys that are primary for given node on given topology, + * but will not be primary after add one new node. + * + * @param ign Ignite. + * @param cacheName Cache name. + * @param size Number of keys. + * @return List of keys. + */ + private List<Integer> getKeysMoved(Ignite ign, String cacheName, int size) { + GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context(); + + ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes()); + + AffinityFunction func = cctx.config().getAffinity(); + + AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl( + nodes, + null, + null, + new AffinityTopologyVersion(1, 0), + cctx.config().getBackups()); + + List<List<ClusterNode>> calcAff = func.assignPartitions(ctx); + + String name = getTestIgniteInstanceName(nodes.size()); + + nodes.add(new FakeNode(name)); + + ctx = new GridAffinityFunctionContextImpl( + nodes, + null, + null, + new AffinityTopologyVersion(1, 0), + cctx.config().getBackups()); + + List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx); + + Set<Integer> movedParts = new HashSet<>(); + + UUID localId = ign.cluster().localNode().id(); + + for (int i = 0; i < calcAff.size(); i++) { + if (calcAff.get(i).get(0).id().equals(localId) && !calcAff2.get(i).get(0).id().equals(localId)) + movedParts.add(i); + } + + List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 10000; i++) { + int keyPart = func.partition(ign.affinity(cacheName).affinityKey(i)); + + if (movedParts.contains(keyPart)) + keys.add(i); + + if (keys.size() == size) + break; + } + + return keys; + } + + /** * */ public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Integer, Object> { @@ -847,4 +908,80 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { return null; } } + + /** + * + */ + public static class FakeNode implements ClusterNode { + /** */ + private final String consistendId; + /** */ + private final UUID uuid; + + /** */ + public FakeNode(String consistendId) { + this.consistendId = consistendId; + uuid = UUID.randomUUID(); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return uuid; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + return consistendId; + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T attribute(String name) { + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics metrics() { + return null; + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> attributes() { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> addresses() { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hostNames() { + return null; + } + + /** {@inheritDoc} */ + @Override public long order() { + return 0; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + return false; + } + } }
