gg-12688 : Fixed updateSequence in ClientTopology.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ac7f2129
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ac7f2129
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ac7f2129

Branch: refs/heads/ignite-6181-1
Commit: ac7f21298f8fd49c2a42034fb955d3c78089ed70
Parents: 5a6808f
Author: Ilya Lantukh <[email protected]>
Authored: Mon Aug 28 20:31:18 2017 +0300
Committer: Ilya Lantukh <[email protected]>
Committed: Mon Aug 28 20:31:18 2017 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        | 18 +++++---
 .../GridDhtPartitionsExchangeFuture.java        | 15 +++++--
 .../IgnitePdsCacheRebalancingAbstractTest.java  | 44 +++++++++++++++++++-
 3 files changed, 66 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ac7f2129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c8856fd..299394f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -1092,20 +1092,26 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
 
         try {
             for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
-                if (!e.getValue().containsKey(p))
+                GridDhtPartitionMap partMap = e.getValue();
+
+                if (!partMap.containsKey(p))
                     continue;
 
-                if (e.getValue().get(p) == OWNING && 
!owners.contains(e.getKey())) {
+                if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) {
                     if (haveHistory)
-                        e.getValue().put(p, MOVING);
+                        partMap.put(p, MOVING);
                     else {
-                        e.getValue().put(p, RENTING);
+                        partMap.put(p, RENTING);
 
                         result.add(e.getKey());
                     }
+
+                    partMap.updateSequence(partMap.updateSequence() + 1, 
partMap.topologyVersion());
+
+                    U.warn(log, "Partition has been scheduled for rebalancing 
due to outdated update counter " +
+                        "[nodeId=" + e.getKey() + ", groupId=" + grpId +
+                        ", partId=" + p + ", haveHistory=" + haveHistory + 
"]");
                 }
-                else if (owners.contains(e.getKey()))
-                    e.getValue().put(p, OWNING);
             }
 
             part2node.put(p, owners);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ac7f2129/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 299284d..240b5f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2437,11 +2437,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private void assignPartitionsStates() {
         if (cctx.database().persistenceEnabled()) {
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
+            for (Map.Entry<Integer, CacheGroupDescriptor> e : 
cctx.affinity().cacheGroups().entrySet()) {
+                if (e.getValue().config().getCacheMode() == CacheMode.LOCAL)
                     continue;
 
-                assignPartitionStates(grp.topology());
+                GridDhtPartitionTopology top;
+
+                CacheGroupContext grpCtx = cctx.cache().cacheGroup(e.getKey());
+
+                if (grpCtx != null)
+                    top = grpCtx.topology();
+                else
+                    top = cctx.exchange().clientTopology(e.getKey());
+
+                assignPartitionStates(top);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ac7f2129/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index 7b047f8..2d237cb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
@@ -46,6 +47,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -96,7 +98,19 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
 
         ccfg2.setQueryEntities(Collections.singleton(qryEntity));
 
-        cfg.setCacheConfiguration(ccfg1, ccfg2);
+        // Do not start filtered cache on coordinator.
+        if (gridName.endsWith("0")) {
+            cfg.setCacheConfiguration(ccfg1, ccfg2);
+        }
+        else {
+            CacheConfiguration ccfg3 = cacheConfiguration("filtered");
+            ccfg3.setPartitionLossPolicy(PartitionLossPolicy.READ_ONLY_SAFE);
+            ccfg3.setBackups(1);
+            
ccfg3.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            ccfg3.setNodeFilter(new CoordinatorNodeFilter());
+
+            cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3);
+        }
 
         MemoryConfiguration memCfg = new MemoryConfiguration();
 
@@ -501,7 +515,23 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
      * @throws Exception If failed.
      */
     public void testForceRebalance() throws Exception {
-        final Ignite ig = startGrids(4);
+        testForceRebalance(cacheName);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testForceRebalanceClientTopology() throws Exception {
+        testForceRebalance("filtered");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testForceRebalance(String cacheName) throws Exception {
+        startGrids(4);
+
+        final Ignite ig = grid(1);
 
         ig.active(true);
 
@@ -653,4 +683,14 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
                 '}';
         }
     }
+
+    /**
+     *
+     */
+    private static class CoordinatorNodeFilter implements 
IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.order() > 1;
+        }
+    }
 }

Reply via email to