gg-12688 : Fixed updateSequence in ClientTopology.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ac7f2129 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ac7f2129 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ac7f2129 Branch: refs/heads/ignite-6181-1 Commit: ac7f21298f8fd49c2a42034fb955d3c78089ed70 Parents: 5a6808f Author: Ilya Lantukh <[email protected]> Authored: Mon Aug 28 20:31:18 2017 +0300 Committer: Ilya Lantukh <[email protected]> Committed: Mon Aug 28 20:31:18 2017 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 18 +++++--- .../GridDhtPartitionsExchangeFuture.java | 15 +++++-- .../IgnitePdsCacheRebalancingAbstractTest.java | 44 +++++++++++++++++++- 3 files changed, 66 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ac7f2129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index c8856fd..299394f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -1092,20 +1092,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { try { for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - if (!e.getValue().containsKey(p)) + GridDhtPartitionMap partMap = e.getValue(); + + if (!partMap.containsKey(p)) continue; - if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) { + if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) { if (haveHistory) - e.getValue().put(p, MOVING); + partMap.put(p, MOVING); else { - e.getValue().put(p, RENTING); + partMap.put(p, RENTING); result.add(e.getKey()); } + + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + + U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + + "[nodeId=" + e.getKey() + ", groupId=" + grpId + + ", partId=" + p + ", haveHistory=" + haveHistory + "]"); } - else if (owners.contains(e.getKey())) - e.getValue().put(p, OWNING); } part2node.put(p, owners); http://git-wip-us.apache.org/repos/asf/ignite/blob/ac7f2129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 299284d..240b5f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2437,11 +2437,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void assignPartitionsStates() { if (cctx.database().persistenceEnabled()) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) + for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) { + if (e.getValue().config().getCacheMode() == CacheMode.LOCAL) continue; - assignPartitionStates(grp.topology()); + GridDhtPartitionTopology top; + + CacheGroupContext grpCtx = cctx.cache().cacheGroup(e.getKey()); + + if (grpCtx != null) + top = grpCtx.topology(); + else + top = cctx.exchange().clientTopology(e.getKey()); + + assignPartitionStates(top); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ac7f2129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java index 7b047f8..2d237cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java @@ -35,6 +35,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; @@ -46,6 +47,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -96,7 +98,19 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb ccfg2.setQueryEntities(Collections.singleton(qryEntity)); - cfg.setCacheConfiguration(ccfg1, ccfg2); + // Do not start filtered cache on coordinator. + if (gridName.endsWith("0")) { + cfg.setCacheConfiguration(ccfg1, ccfg2); + } + else { + CacheConfiguration ccfg3 = cacheConfiguration("filtered"); + ccfg3.setPartitionLossPolicy(PartitionLossPolicy.READ_ONLY_SAFE); + ccfg3.setBackups(1); + ccfg3.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg3.setNodeFilter(new CoordinatorNodeFilter()); + + cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3); + } MemoryConfiguration memCfg = new MemoryConfiguration(); @@ -501,7 +515,23 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb * @throws Exception If failed. */ public void testForceRebalance() throws Exception { - final Ignite ig = startGrids(4); + testForceRebalance(cacheName); + } + + /** + * @throws Exception If failed. + */ + public void testForceRebalanceClientTopology() throws Exception { + testForceRebalance("filtered"); + } + + /** + * @throws Exception If failed. + */ + private void testForceRebalance(String cacheName) throws Exception { + startGrids(4); + + final Ignite ig = grid(1); ig.active(true); @@ -653,4 +683,14 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb '}'; } } + + /** + * + */ + private static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return node.order() > 1; + } + } }
