Repository: ignite Updated Branches: refs/heads/master bf25b5c52 -> e93b28488
ignite-5446 Only lateAffinity logic in CacheAffinitySharedManager. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e93b2848 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e93b2848 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e93b2848 Branch: refs/heads/master Commit: e93b28488693381fcd232de93087ab8ec1d0f5bb Parents: bf25b5c Author: sboikov <sboi...@gridgain.com> Authored: Tue Jul 11 14:18:52 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jul 11 14:18:52 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 211 +++++++------------ 1 file changed, 74 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e93b2848/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 548d795..79ab183 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -83,9 +83,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private final long clientCacheMsgTimeout = IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000); - /** Late affinity assignment flag. */ - private boolean lateAffAssign; - /** Affinity information for all started caches (initialized on coordinator). */ private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>(); @@ -132,13 +129,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override protected void start0() throws IgniteCheckedException { super.start0(); - if (cctx.database().persistenceEnabled() && !cctx.kernalContext().config().isLateAffinityAssignment()) - U.quietAndWarn(log, - "Persistence is enabled, but late affinity assignment is disabled. " + - "Since it is required for persistence mode, it will be implicitly enabled."); - - lateAffAssign = cctx.kernalContext().config().isLateAffinityAssignment() || cctx.database().persistenceEnabled(); - cctx.kernalContext().event().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -193,8 +183,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return {@code True} if minor topology version should be increased. */ boolean onCustomEvent(CacheAffinityChangeMessage msg) { - assert lateAffAssign : msg; - if (msg.exchangeId() != null) { if (log.isDebugEnabled()) { log.debug("Ignore affinity change message [lastAffVer=" + lastAffVer + @@ -259,9 +247,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param checkGrpId Group ID. */ void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) { - if (!lateAffAssign) - return; - CacheAffinityChangeMessage msg = null; synchronized (mux) { @@ -349,13 +334,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param grp Cache group. */ void onCacheGroupCreated(CacheGroupContext grp) { - final Integer grpId = grp.groupId(); - if (!grpHolders.containsKey(grp.groupId())) { - cctx.io().addCacheGroupHandler(grpId, GridDhtAffinityAssignmentResponse.class, + cctx.io().addCacheGroupHandler(grp.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(grpId, nodeId, res); + processAffinityAssignmentResponse(nodeId, res); } }); } @@ -692,7 +675,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap caches.updateCachesInfo(exchActions); // Affinity did not change for existing caches. - forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (exchActions.cacheGroupStopping(aff.groupId())) return; @@ -772,7 +755,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Integer grpId = action.descriptor().groupId(); if (gprs.add(grpId)) { - if (crd && lateAffAssign) + if (crd) initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor()); else { CacheGroupContext grp = cctx.cache().cacheGroup(grpId); @@ -813,7 +796,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Set<Integer> stoppedGrps = null; - if (crd && lateAffAssign) { + if (crd) { for (ExchangeActions.CacheGroupActionData data : exchActions.cacheGroupsToStop()) { if (data.descriptor().config().getCacheMode() != LOCAL) { CacheGroupHolder cacheGrp = grpHolders.remove(data.descriptor().groupId()); @@ -1026,32 +1009,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { boolean locJoin = fut.discoveryEvent().eventNode().isLocal(); - if (lateAffAssign) { - if (!locJoin) { - forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { - @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { - AffinityTopologyVersion topVer = fut.topologyVersion(); - - aff.clientEventTopologyChange(fut.discoveryEvent(), topVer); - } - }); - } - else - fetchAffinityOnJoin(fut); - } - else { - if (!locJoin) { - forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { - @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { - AffinityTopologyVersion topVer = fut.topologyVersion(); + if (!locJoin) { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { + AffinityTopologyVersion topVer = fut.topologyVersion(); - aff.clientEventTopologyChange(fut.discoveryEvent(), topVer); - } - }); - } - else - initAffinityNoLateAssignment(fut); + aff.clientEventTopologyChange(fut.discoveryEvent(), topVer); + } + }); } + else + fetchAffinityOnJoin(fut); } /** @@ -1074,11 +1042,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param grpId Cache group ID. * @param nodeId Node ID. * @param res Response. */ - private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) { + private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) { if (log.isDebugEnabled()) log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']'); @@ -1093,8 +1060,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed */ private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException { - assert lateAffAssign; - for (CacheGroupDescriptor cacheDesc : caches.allGroups()) { if (cacheDesc.config().getCacheMode() == LOCAL) continue; @@ -1179,10 +1144,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ) throws IgniteCheckedException { caches.initStartedCaches(descs); - if (crd && lateAffAssign) { + if (crd) { forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { - CacheGroupHolder cache = groupHolder(fut, desc); + CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc); if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = @@ -1247,7 +1212,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert desc != null : aff.cacheOrGroupName(); // Do not request affinity from remote nodes if affinity function is not centralized. - if (!lateAffAssign && !aff.centralizedAffinityFunction()) + if (!aff.centralizedAffinityFunction()) return true; // If local node did not initiate exchange or local node is the only cache node in grid. @@ -1272,31 +1237,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap WaitRebalanceInfo waitRebalanceInfo = null; - if (lateAffAssign) { - if (locJoin) { - if (crd) { - forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { - @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { - AffinityTopologyVersion topVer = fut.topologyVersion(); + if (locJoin) { + if (crd) { + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + AffinityTopologyVersion topVer = fut.topologyVersion(); - CacheGroupHolder cache = groupHolder(fut, desc); + CacheGroupHolder cache = groupHolder(topVer, desc); - List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, - fut.discoveryEvent(), - fut.discoCache()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, + fut.discoveryEvent(), + fut.discoCache()); - cache.affinity().initialize(topVer, newAff); - } - }); - } - else - fetchAffinityOnJoin(fut); + cache.affinity().initialize(topVer, newAff); + } + }); } else - waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd); + fetchAffinityOnJoin(fut); + } + else { + waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache(), + crd); } - else - initAffinityNoLateAssignment(fut); synchronized (mux) { affCalcVer = fut.topologyVersion(); @@ -1305,7 +1270,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap WaitRebalanceInfo info = this.waitInfo; - if (crd && lateAffAssign) { + if (crd) { if (log.isDebugEnabled()) { log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); @@ -1412,7 +1377,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (idealAff != null) affCache.idealAssignment(idealAff); else { - assert !affCache.centralizedAffinityFunction() || !lateAffAssign; + assert !affCache.centralizedAffinityFunction(); affCache.calculate(topVer, discoveryEvt, discoCache); } @@ -1439,22 +1404,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert !leftNode.isClient() : leftNode; - boolean centralizedAff; - - if (lateAffAssign) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; - - grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); - } - - centralizedAff = true; - } - else { - initAffinityNoLateAssignment(fut); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; - centralizedAff = false; + grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); } synchronized (mux) { @@ -1463,22 +1417,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap this.waitInfo = null; } - return centralizedAff; - } - - /** - * @param fut Exchange future. - * @throws IgniteCheckedException If failed. - */ - private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - assert !lateAffAssign; - - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; - - initAffinity(caches.group(grp.groupId()), grp.affinity(), fut); - } + return true; } /** @@ -1488,8 +1427,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - assert lateAffAssign; - final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @@ -1512,7 +1449,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(grpId, nodeId, res); + processAffinityAssignmentResponse(nodeId, res); } } ); @@ -1587,15 +1524,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param fut Exchange future. + * @param topVer Topology version. * @param desc Cache descriptor. * @return Cache holder. * @throws IgniteCheckedException If failed. */ - private CacheGroupHolder groupHolder(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor desc) + private CacheGroupHolder groupHolder(AffinityTopologyVersion topVer, final CacheGroupDescriptor desc) throws IgniteCheckedException { - assert lateAffAssign; - CacheGroupHolder cacheGrp = grpHolders.get(desc.groupId()); if (cacheGrp != null) @@ -1607,12 +1542,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(desc.groupId(), nodeId, res); + processAffinityAssignmentResponse(nodeId, res); } } ); - cacheGrp = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null); + cacheGrp = CacheGroupHolder2.create(cctx, desc, topVer, null); } else cacheGrp = new CacheGroupHolder1(grp, null); @@ -1625,17 +1560,18 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param fut Exchange future. + * @param topVer Topology version. + * @param evt Discovery event. + * @param discoCache Discovery data cache. * @param crd Coordinator flag. * @throws IgniteCheckedException If failed. * @return Rabalance info. */ - @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) + @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final AffinityTopologyVersion topVer, + final DiscoveryEvent evt, + final DiscoCache discoCache, + boolean crd) throws IgniteCheckedException { - assert lateAffAssign; - - AffinityTopologyVersion topVer = fut.topologyVersion(); - final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); if (!crd) { @@ -1645,7 +1581,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean latePrimary = grp.rebalanceEnabled(); - initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, affCache); + initAffinityOnNodeJoin(topVer, evt, discoCache, grp.affinity(), null, latePrimary, affCache); } return null; @@ -1655,11 +1591,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { - CacheGroupHolder cache = groupHolder(fut, desc); + CacheGroupHolder cache = groupHolder(topVer, desc); boolean latePrimary = cache.rebalanceEnabled; - initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache); + initAffinityOnNodeJoin(topVer, + evt, + discoCache, + cache.affinity(), + waitRebalanceInfo, + latePrimary, + affCache); } }); @@ -1668,24 +1610,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param fut Exchange future. + * @param topVer Topology version. + * @param evt Discovery event. + * @param discoCache Discovery data cache. * @param aff Affinity. * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. * @param affCache Already calculated assignments (to reduce data stored in history). * @throws IgniteCheckedException If failed. */ - private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut, + private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer, + DiscoveryEvent evt, + DiscoCache discoCache, GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, boolean latePrimary, Map<Object, List<List<ClusterNode>>> affCache) throws IgniteCheckedException { - assert lateAffAssign; - - AffinityTopologyVersion topVer = fut.topologyVersion(); - AffinityTopologyVersion affTopVer = aff.lastVersion(); assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + @@ -1695,7 +1637,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, discoCache); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { @@ -1726,7 +1668,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (newAssignment == null) newAssignment = idealAssignment; - aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache)); } /** @@ -1762,7 +1704,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ClusterNode curPrimary, List<ClusterNode> newNodes, WaitRebalanceInfo rebalance) { - assert lateAffAssign; assert curPrimary != null; assert !F.isEmpty(newNodes); assert !curPrimary.equals(newNodes.get(0)); @@ -1791,8 +1732,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinityOnNodeLeft( final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - assert lateAffAssign; - IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut); if (initFut != null && !initFut.isDone()) { @@ -1822,8 +1761,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - assert lateAffAssign; - final AffinityTopologyVersion topVer = fut.topologyVersion(); final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); @@ -1834,7 +1771,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { - CacheGroupHolder grpHolder = groupHolder(fut, desc); + CacheGroupHolder grpHolder = groupHolder(fut.topologyVersion(), desc); if (!grpHolder.rebalanceEnabled) return;