Merge branch 'ignite-5578-locJoin' into ignite-5578 # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java # modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5978213e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5978213e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5978213e Branch: refs/heads/ignite-5578 Commit: 5978213e1e8c8d81d4b33ced4e38bec1edd5cee5 Parents: a7cb829 ba46cbd Author: sboikov <[email protected]> Authored: Wed Jul 19 13:13:15 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 19 13:13:15 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 3 +- .../discovery/GridDiscoveryManager.java | 5 +- .../cache/CacheAffinitySharedManager.java | 52 ++++++++++++++++++-- .../GridCachePartitionExchangeManager.java | 17 ------- .../preloader/CacheGroupAffinityMessage.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 7 ++- .../preloader/GridDhtPartitionsFullMessage.java | 4 +- 7 files changed, 62 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 6ec9b73,1e34f0c..8d309ed --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@@ -697,7 -697,11 +697,7 @@@ public class GridDiscoveryManager exten // event notifications, since SPI notifies manager about all events from this listener. if (verChanged) { if (discoCache == null) - discoCache = createDiscoCache(nextTopVer, ctx.state().clusterState(), locNode, topSnapshot); - discoCache = createDiscoCache( - nextTopVer, - ctx.state().clusterState(), - locNode, - topSnapshot); ++ discoCache = createDiscoCache(nextTopVer,ctx.state().clusterState(), locNode, topSnapshot); discoCacheHist.put(nextTopVer, discoCache); @@@ -767,8 -771,11 +767,8 @@@ topHist.clear(); - topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache( - AffinityTopologyVersion.ZERO, - ctx.state().clusterState(), - locNode, - Collections.<ClusterNode>emptySet()))); + topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, - createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet()))); ++ createDiscoCache(AffinityTopologyVersion.ZERO,ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet()))); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index ba6a22b,bb27613..1fc59bb --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@@ -1244,130 -1247,51 +1246,174 @@@ public class CacheAffinitySharedManager return grpHolder.affinity(); } + public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsFullMessage msg) { + final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); + + final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + + log.info("mergeExchangesOnServerLeft [topVer=" + fut.context().events().discoveryCache().version() + ']'); + + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { + ExchangeDiscoveryEvents evts = fut.context().events(); + + Map<Integer, CacheGroupAffinityMessage> idealAffDiff = msg.idealAffinityDiff(); + + List<List<ClusterNode>> idealAssignment = + aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache()); + + CacheGroupAffinityMessage affMsg = idealAffDiff != null ? idealAffDiff.get(aff.groupId()) : null; + + List<List<ClusterNode>> newAssignment; + + if (affMsg != null) { + Map<Integer, GridLongList> diff = affMsg.assignmentsDiff(); + + assert !F.isEmpty(diff); + + newAssignment = new ArrayList<>(idealAssignment); + + for (Map.Entry<Integer, GridLongList> e : diff.entrySet()) { + GridLongList assign = e.getValue(); + + newAssignment.set(e.getKey(), CacheGroupAffinityMessage.toNodes(assign, + nodesByOrder, + evts.discoveryCache())); + } + } + else + newAssignment = idealAssignment; + + aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + } + }); + } + + public void onJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) + throws IgniteCheckedException { + final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin(); + + final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); + + final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity(); + + assert !F.isEmpty(joinedNodeAff) : msg; + assert joinedNodeAff.size() >= affReq.size(); + + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { + if (affReq.contains(aff.groupId())) { + assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()); + + CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId()); + + assert affMsg != null; + + ExchangeDiscoveryEvents evts = fut.context().events(); + + List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache()); + + // Calculate ideal assignments. + if (!aff.centralizedAffinityFunction()) + aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache()); + + aff.initialize(evts.topologyVersion(), assignments); + + CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); + + assert grp != null; + + grp.topology().initPartitions(fut); + } + } + }); + } + + public void mergeExchangesOnServerJoin(GridDhtPartitionsExchangeFuture fut, boolean crd) + throws IgniteCheckedException { + final ExchangeDiscoveryEvents evts = fut.context().events(); + + log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']'); + + assert evts.serverJoin() && !evts.serverLeft(); + + WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd); + + setWaitRebalanceInfo(waitRebalanceInfo, evts.waitRebalanceEventVersion(), crd); + } + + public Map<Integer, CacheGroupAffinityMessage> mergeExchangesInitAffinityOnServerLeft( + GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException + { + final ExchangeDiscoveryEvents evts = fut.context().events(); + + assert evts.serverLeft(); + + log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']'); + + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + AffinityTopologyVersion topVer = evts.topologyVersion(); + + CacheGroupHolder cache = groupHolder(topVer, desc); + + cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache()); + } + }); + + Map<Integer, Map<Integer, List<Long>>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(), + fut, + NODE_TO_ORDER, + true); + + return CacheGroupAffinityMessage.createAffinityDiffMessages(diff); + } + /** + * @param fut Exchange future. + * @param msg Message. + */ + public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) { + final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin(); + + if (F.isEmpty(affReq)) + return; + + final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); + + final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity(); + + assert !F.isEmpty(joinedNodeAff) : msg; + assert joinedNodeAff.size() >= affReq.size(); + + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { + if (affReq.contains(aff.groupId())) { + assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()); + + CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId()); + + assert affMsg != null; + + List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache()); + + // Calculate ideal assignments. + if (!aff.centralizedAffinityFunction()) + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + + aff.initialize(fut.topologyVersion(), assignments); + + CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); + + assert grp != null; + + grp.topology().initPartitions(fut); + } + } + }); + } + + /** * Called on exchange initiated by server node join. * * @param fut Exchange future. http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 9ae4032,4f572df..17bea14 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -1264,12 -1201,10 +1266,13 @@@ public class GridDhtPartitionsExchangeF /** * @param nodes Nodes. + * @param joinedNodeAff Affinity if was requested by some nodes. * @throws IgniteCheckedException If failed. */ - private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) + private void sendAllPartitions(Collection<ClusterNode> nodes, + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff, + Map<Integer, CacheGroupAffinityMessage> idealAffDiff, + @Nullable GridDhtPartitionExchangeId msgExchId) throws IgniteCheckedException { boolean singleNode = nodes.size() == 1; @@@ -2345,68 -2045,43 +2348,68 @@@ * @param msg Message. */ private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { - assert exchId.equals(msg.exchangeId()) : msg; - assert msg.lastVersion() != null : msg; + try { + assert exchId.equals(msg.exchangeId()) : msg; + assert msg.lastVersion() != null : msg; - synchronized (this) { - if (crd == null || finishState != null) - return; + synchronized (this) { + if (crd == null || finishState != null) + return; - if (!crd.equals(node)) { - if (log.isDebugEnabled()) - log.debug("Received full partition map from unexpected node [oldest=" + crd.id() + - ", nodeId=" + node.id() + ']'); + if (!crd.equals(node)) { + if (log.isDebugEnabled()) + log.debug("Received full partition map from unexpected node [oldest=" + crd.id() + + ", nodeId=" + node.id() + ']'); - if (node.order() > crd.order()) - fullMsgs.put(node, msg); + if (node.order() > crd.order()) + fullMsgs.put(node, msg); - return; + return; + } + + finishState = new FinishState(crd.id(), msg.resultTopologyVersion()); } - finishState = new FinishState(crd.id()); - } + if (exchCtx.mergeExchanges()) { + if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) { + log.info("Received full message, need merge [curFut=" + topologyVersion() + + ", resVer=" + msg.resultTopologyVersion() + ']'); + + cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion()); + } - Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin(); + if (localJoinExchange()) - cctx.affinity().onJoin(this, msg); ++ cctx.affinity().onLocalJoin(this, msg); + else { + if (exchCtx.events().serverLeft()) + cctx.affinity().mergeExchangesOnServerLeft(this, msg); + else + cctx.affinity().mergeExchangesOnServerJoin(this, false); - if (localJoinExchange() && affReq != null) - cctx.affinity().onLocalJoin(this, msg); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; - updatePartitionFullMap(msg); + grp.topology().beforeExchange(this, true); + } + } + } - IgniteCheckedException err = null; + updatePartitionFullMap(msg); - if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) { - err = new IgniteCheckedException("Cluster state change failed"); + IgniteCheckedException err = null; - cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest()); - } + if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) { + err = new IgniteCheckedException("Cluster state change failed"); - onDone(exchId.topologyVersion(), err); + cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest()); + } + + onDone(exchCtx.events().topologyVersion(), err); + } + catch (IgniteCheckedException e) { + onDone(e); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ----------------------------------------------------------------------
