IGNITE-8210 Fixed custom event handling for baseline topology change - Fixes #3814.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d79c6409 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d79c6409 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d79c6409 Branch: refs/heads/ignite-7708 Commit: d79c6409bcb8ca3170ce9153db486cac2c537fc4 Parents: 7731669 Author: Sergey Chugunov <[email protected]> Authored: Tue Apr 17 14:28:47 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Apr 17 14:28:47 2018 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 2 +- .../distributed/CacheBaselineTopologyTest.java | 94 ++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d79c6409/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index b1899e3..9d5ce05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -322,7 +322,7 @@ public class GridAffinityAssignmentCache { for (DiscoveryEvent event : events.events()) { boolean affinityNode = CU.affinityNode(event.eventNode(), nodeFilter); - if (affinityNode) { + if (affinityNode || event.type() == EVT_DISCOVERY_CUSTOM_EVT) { skipCalculation = false; break; http://git-wip-us.apache.org/repos/asf/ignite/blob/d79c6409/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java index 26502ed..0d59a2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -81,6 +83,12 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { private boolean delayRebalance; /** */ + private Map<String, Object> userAttrs; + + /** */ + private static final String DATA_NODE = "dataNodeUserAttr"; + + /** */ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ @@ -129,6 +137,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { .setWalMode(WALMode.LOG_ONLY) ); + if (userAttrs != null) + cfg.setUserAttributes(userAttrs); + if (client) cfg.setClientMode(true); @@ -139,6 +150,89 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { } /** + * Verifies that rebalance on cache with Node Filter happens when BaselineTopology changes. + * + * @throws Exception + */ + public void testRebalanceForCacheWithNodeFilter() throws Exception { + try { + final int EMPTY_NODE_IDX = 2; + + userAttrs = U.newHashMap(1); + userAttrs.put(DATA_NODE, true); + + startGrids(2); + + userAttrs.put(DATA_NODE, false); + + IgniteEx ignite = startGrid(2); + + ignite.cluster().active(true); + + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache = + ignite.createCache( + new CacheConfiguration<Integer, Integer>() + .setName(CACHE_NAME) + .setCacheMode(PARTITIONED) + .setBackups(1) + .setPartitionLossPolicy(READ_ONLY_SAFE) + .setAffinity(new RendezvousAffinityFunction(32, null)) + .setNodeFilter(new DataNodeFilter()) + ); + + for (int k = 0; k < 10_000; k++) + cache.put(k, k); + + Thread.sleep(500); + + printSizesDataNodes(NODE_COUNT - 1, EMPTY_NODE_IDX); + + userAttrs.put(DATA_NODE, true); + + startGrid(3); + + ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion()); + + awaitPartitionMapExchange(); + + Thread.sleep(500); + + printSizesDataNodes(NODE_COUNT, EMPTY_NODE_IDX); + } + finally { + userAttrs = null; + } + } + + /** */ + private void printSizesDataNodes(int nodesCnt, int emptyNodeIdx) { + for (int i = 0; i < nodesCnt; i++) { + IgniteEx ig = grid(i); + + int locSize = ig.cache(CACHE_NAME).localSize(CachePeekMode.PRIMARY); + + if (i == emptyNodeIdx) + assertEquals("Cache local size on " + + i + + " node is expected to be zero", 0, locSize); + else + assertTrue("Cache local size on " + + i + + " node is expected to be non zero", locSize > 0); + } + } + + /** */ + private static class DataNodeFilter implements IgnitePredicate<ClusterNode> { + + @Override public boolean apply(ClusterNode clusterNode) { + return clusterNode.attribute(DATA_NODE); + } + } + + /** * @throws Exception If failed. */ public void testTopologyChangesWithFixedBaseline() throws Exception {
