Repository: ignite
Updated Branches:
  refs/heads/master bf25b5c52 -> e93b28488


ignite-5446 Only lateAffinity logic in CacheAffinitySharedManager.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e93b2848
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e93b2848
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e93b2848

Branch: refs/heads/master
Commit: e93b28488693381fcd232de93087ab8ec1d0f5bb
Parents: bf25b5c
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Jul 11 14:18:52 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Jul 11 14:18:52 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 211 +++++++------------
 1 file changed, 74 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e93b2848/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 548d795..79ab183 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -83,9 +83,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     private final long clientCacheMsgTimeout =
         
IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT,
 10_000);
 
-    /** Late affinity assignment flag. */
-    private boolean lateAffAssign;
-
     /** Affinity information for all started caches (initialized on 
coordinator). */
     private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new 
ConcurrentHashMap<>();
 
@@ -132,13 +129,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        if (cctx.database().persistenceEnabled() && 
!cctx.kernalContext().config().isLateAffinityAssignment())
-            U.quietAndWarn(log,
-                "Persistence is enabled, but late affinity assignment is 
disabled. " +
-                    "Since it is required for persistence mode, it will be 
implicitly enabled.");
-
-        lateAffAssign = 
cctx.kernalContext().config().isLateAffinityAssignment() || 
cctx.database().persistenceEnabled();
-
         cctx.kernalContext().event().addLocalEventListener(discoLsnr, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
@@ -193,8 +183,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @return {@code True} if minor topology version should be increased.
      */
     boolean onCustomEvent(CacheAffinityChangeMessage msg) {
-        assert lateAffAssign : msg;
-
         if (msg.exchangeId() != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Ignore affinity change message [lastAffVer=" + 
lastAffVer +
@@ -259,9 +247,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param checkGrpId Group ID.
      */
     void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) 
{
-        if (!lateAffAssign)
-            return;
-
         CacheAffinityChangeMessage msg = null;
 
         synchronized (mux) {
@@ -349,13 +334,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param grp Cache group.
      */
     void onCacheGroupCreated(CacheGroupContext grp) {
-        final Integer grpId = grp.groupId();
-
         if (!grpHolders.containsKey(grp.groupId())) {
-            cctx.io().addCacheGroupHandler(grpId, 
GridDhtAffinityAssignmentResponse.class,
+            cctx.io().addCacheGroupHandler(grp.groupId(), 
GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(grpId, nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 });
         }
@@ -692,7 +675,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         caches.updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
-        forAllCacheGroups(crd && lateAffAssign, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
+        forAllCacheGroups(crd, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
                 if (exchActions.cacheGroupStopping(aff.groupId()))
                     return;
@@ -772,7 +755,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             Integer grpId = action.descriptor().groupId();
 
             if (gprs.add(grpId)) {
-                if (crd && lateAffAssign)
+                if (crd)
                     initStartedGroupOnCoordinator(fut, 
action.descriptor().groupDescriptor());
                 else {
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
@@ -813,7 +796,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         Set<Integer> stoppedGrps = null;
 
-        if (crd && lateAffAssign) {
+        if (crd) {
             for (ExchangeActions.CacheGroupActionData data : 
exchActions.cacheGroupsToStop()) {
                 if (data.descriptor().config().getCacheMode() != LOCAL) {
                     CacheGroupHolder cacheGrp = 
grpHolders.remove(data.descriptor().groupId());
@@ -1026,32 +1009,17 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, 
boolean crd) throws IgniteCheckedException {
         boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
 
-        if (lateAffAssign) {
-            if (!locJoin) {
-                forAllCacheGroups(crd, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
-                    @Override public void applyx(GridAffinityAssignmentCache 
aff) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = fut.topologyVersion();
-
-                        aff.clientEventTopologyChange(fut.discoveryEvent(), 
topVer);
-                    }
-                });
-            }
-            else
-                fetchAffinityOnJoin(fut);
-        }
-        else {
-            if (!locJoin) {
-                forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
-                    @Override public void applyx(GridAffinityAssignmentCache 
aff) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = fut.topologyVersion();
+        if (!locJoin) {
+            forAllCacheGroups(crd, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
+                @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
+                    AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                        aff.clientEventTopologyChange(fut.discoveryEvent(), 
topVer);
-                    }
-                });
-            }
-            else
-                initAffinityNoLateAssignment(fut);
+                    aff.clientEventTopologyChange(fut.discoveryEvent(), 
topVer);
+                }
+            });
         }
+        else
+            fetchAffinityOnJoin(fut);
     }
 
     /**
@@ -1074,11 +1042,10 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * @param grpId Cache group ID.
      * @param nodeId Node ID.
      * @param res Response.
      */
-    private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
+    private void processAffinityAssignmentResponse(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing affinity assignment response [node=" + 
nodeId + ", res=" + res + ']');
 
@@ -1093,8 +1060,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed
      */
     private void 
forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws 
IgniteCheckedException {
-        assert lateAffAssign;
-
         for (CacheGroupDescriptor cacheDesc : caches.allGroups()) {
             if (cacheDesc.config().getCacheMode() == LOCAL)
                 continue;
@@ -1179,10 +1144,10 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     ) throws IgniteCheckedException {
         caches.initStartedCaches(descs);
 
-        if (crd && lateAffAssign) {
+        if (crd) {
             forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(fut, desc);
+                    CacheGroupHolder cache = 
groupHolder(fut.topologyVersion(), desc);
 
                     if 
(cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                         List<List<ClusterNode>> assignment =
@@ -1247,7 +1212,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         assert desc != null : aff.cacheOrGroupName();
 
         // Do not request affinity from remote nodes if affinity function is 
not centralized.
-        if (!lateAffAssign && !aff.centralizedAffinityFunction())
+        if (!aff.centralizedAffinityFunction())
             return true;
 
         // If local node did not initiate exchange or local node is the only 
cache node in grid.
@@ -1272,31 +1237,31 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         WaitRebalanceInfo waitRebalanceInfo = null;
 
-        if (lateAffAssign) {
-            if (locJoin) {
-                if (crd) {
-                    forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
-                        @Override public void applyx(CacheGroupDescriptor 
desc) throws IgniteCheckedException {
-                            AffinityTopologyVersion topVer = 
fut.topologyVersion();
+        if (locJoin) {
+            if (crd) {
+                forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
+                    @Override public void applyx(CacheGroupDescriptor desc) 
throws IgniteCheckedException {
+                        AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                            CacheGroupHolder cache = groupHolder(fut, desc);
+                        CacheGroupHolder cache = groupHolder(topVer, desc);
 
-                            List<List<ClusterNode>> newAff = 
cache.affinity().calculate(topVer,
-                                fut.discoveryEvent(),
-                                fut.discoCache());
+                        List<List<ClusterNode>> newAff = 
cache.affinity().calculate(topVer,
+                            fut.discoveryEvent(),
+                            fut.discoCache());
 
-                            cache.affinity().initialize(topVer, newAff);
-                        }
-                    });
-                }
-                else
-                    fetchAffinityOnJoin(fut);
+                        cache.affinity().initialize(topVer, newAff);
+                    }
+                });
             }
             else
-                waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
+                fetchAffinityOnJoin(fut);
+        }
+        else {
+            waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                crd);
         }
-        else
-            initAffinityNoLateAssignment(fut);
 
         synchronized (mux) {
             affCalcVer = fut.topologyVersion();
@@ -1305,7 +1270,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
             WaitRebalanceInfo info = this.waitInfo;
 
-            if (crd && lateAffAssign) {
+            if (crd) {
                 if (log.isDebugEnabled()) {
                     log.debug("Computed new affinity after node join [topVer=" 
+ fut.topologyVersion() +
                         ", waitGrps=" + (info != null ? 
groupNames(info.waitGrps.keySet()) : null) + ']');
@@ -1412,7 +1377,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             if (idealAff != null)
                 affCache.idealAssignment(idealAff);
             else {
-                assert !affCache.centralizedAffinityFunction() || 
!lateAffAssign;
+                assert !affCache.centralizedAffinityFunction();
 
                 affCache.calculate(topVer, discoveryEvt, discoCache);
             }
@@ -1439,22 +1404,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         assert !leftNode.isClient() : leftNode;
 
-        boolean centralizedAff;
-
-        if (lateAffAssign) {
-            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (grp.isLocal())
-                    continue;
-
-                grp.affinity().calculate(fut.topologyVersion(), 
fut.discoveryEvent(), fut.discoCache());
-            }
-
-            centralizedAff = true;
-        }
-        else {
-            initAffinityNoLateAssignment(fut);
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
+                continue;
 
-            centralizedAff = false;
+            grp.affinity().calculate(fut.topologyVersion(), 
fut.discoveryEvent(), fut.discoCache());
         }
 
         synchronized (mux) {
@@ -1463,22 +1417,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             this.waitInfo = null;
         }
 
-        return centralizedAff;
-    }
-
-    /**
-     * @param fut Exchange future.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture 
fut) throws IgniteCheckedException {
-        assert !lateAffAssign;
-
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (grp.isLocal())
-                continue;
-
-            initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
-        }
+        return true;
     }
 
     /**
@@ -1488,8 +1427,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      */
     private IgniteInternalFuture<?> initCoordinatorCaches(final 
GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
         final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new 
ArrayList<>();
 
         forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
@@ -1512,7 +1449,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     cctx.io().addCacheGroupHandler(desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
                         new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                             @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
-                                processAffinityAssignmentResponse(grpId, 
nodeId, res);
+                                processAffinityAssignmentResponse(nodeId, res);
                             }
                         }
                     );
@@ -1587,15 +1524,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
      * @param desc Cache descriptor.
      * @return Cache holder.
      * @throws IgniteCheckedException If failed.
      */
-    private CacheGroupHolder groupHolder(GridDhtPartitionsExchangeFuture fut, 
final CacheGroupDescriptor desc)
+    private CacheGroupHolder groupHolder(AffinityTopologyVersion topVer, final 
CacheGroupDescriptor desc)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
         CacheGroupHolder cacheGrp = grpHolders.get(desc.groupId());
 
         if (cacheGrp != null)
@@ -1607,12 +1542,12 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             cctx.io().addCacheGroupHandler(desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
-                        processAffinityAssignmentResponse(desc.groupId(), 
nodeId, res);
+                        processAffinityAssignmentResponse(nodeId, res);
                     }
                 }
             );
 
-            cacheGrp = CacheGroupHolder2.create(cctx, desc, 
fut.topologyVersion(), null);
+            cacheGrp = CacheGroupHolder2.create(cctx, desc, topVer, null);
         }
         else
             cacheGrp = new CacheGroupHolder1(grp, null);
@@ -1625,17 +1560,18 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
+     * @param evt Discovery event.
+     * @param discoCache Discovery data cache.
      * @param crd Coordinator flag.
      * @throws IgniteCheckedException If failed.
      * @return Rabalance info.
      */
-    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final 
GridDhtPartitionsExchangeFuture fut, boolean crd)
+    @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final 
AffinityTopologyVersion topVer,
+        final DiscoveryEvent evt,
+        final DiscoCache discoCache,
+        boolean crd)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
         if (!crd) {
@@ -1645,7 +1581,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 boolean latePrimary = grp.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, 
affCache);
+                initAffinityOnNodeJoin(topVer, evt, discoCache, 
grp.affinity(), null, latePrimary, affCache);
             }
 
             return null;
@@ -1655,11 +1591,17 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
             forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
-                    CacheGroupHolder cache = groupHolder(fut, desc);
+                    CacheGroupHolder cache = groupHolder(topVer, desc);
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(fut, cache.affinity(), 
waitRebalanceInfo, latePrimary, affCache);
+                    initAffinityOnNodeJoin(topVer,
+                        evt,
+                        discoCache,
+                        cache.affinity(),
+                        waitRebalanceInfo,
+                        latePrimary,
+                        affCache);
                 }
             });
 
@@ -1668,24 +1610,24 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
+     * @param evt Discovery event.
+     * @param discoCache Discovery data cache.
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is 
not owner.
      * @param affCache Already calculated assignments (to reduce data stored 
in history).
      * @throws IgniteCheckedException If failed.
      */
-    private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
+    private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer,
+        DiscoveryEvent evt,
+        DiscoCache discoCache,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
         boolean latePrimary,
         Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException
     {
-        assert lateAffAssign;
-
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
         AffinityTopologyVersion affTopVer = aff.lastVersion();
 
         assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized 
[grp=" + aff.cacheOrGroupName() +
@@ -1695,7 +1637,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         assert aff.idealAssignment() != null : "Previous assignment is not 
available.";
 
-        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, 
fut.discoveryEvent(), fut.discoCache());
+        List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, 
discoCache);
         List<List<ClusterNode>> newAssignment = null;
 
         if (latePrimary) {
@@ -1726,7 +1668,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(fut.topologyVersion(), cachedAssignment(aff, 
newAssignment, affCache));
+        aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
     }
 
     /**
@@ -1762,7 +1704,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         ClusterNode curPrimary,
         List<ClusterNode> newNodes,
         WaitRebalanceInfo rebalance) {
-        assert lateAffAssign;
         assert curPrimary != null;
         assert !F.isEmpty(newNodes);
         assert !curPrimary.equals(newNodes.get(0));
@@ -1791,8 +1732,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      */
     public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> 
initAffinityOnNodeLeft(
         final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException {
-        assert lateAffAssign;
-
         IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut);
 
         if (initFut != null && !initFut.isDone()) {
@@ -1822,8 +1761,6 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      */
     private Map<Integer, Map<Integer, List<UUID>>> 
initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
-        assert lateAffAssign;
-
         final AffinityTopologyVersion topVer = fut.topologyVersion();
 
         final WaitRebalanceInfo waitRebalanceInfo = new 
WaitRebalanceInfo(topVer);
@@ -1834,7 +1771,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
-                CacheGroupHolder grpHolder = groupHolder(fut, desc);
+                CacheGroupHolder grpHolder = 
groupHolder(fut.topologyVersion(), desc);
 
                 if (!grpHolder.rebalanceEnabled)
                     return;

Reply via email to