ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8fb60ffc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8fb60ffc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8fb60ffc Branch: refs/heads/ignite-5578-locJoin Commit: 8fb60ffcff592b7128ae88151f0fa07c016e0126 Parents: 056847c Author: sboikov <[email protected]> Authored: Wed Jul 19 12:50:36 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 19 12:50:36 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 14 ++++ .../discovery/GridDiscoveryManager.java | 31 +++++++-- .../cache/CacheAffinitySharedManager.java | 46 ++++++++++++ .../preloader/CacheGroupAffinityMessage.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 73 +++++--------------- .../preloader/GridDhtPartitionsFullMessage.java | 20 ++++-- 6 files changed, 120 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index f63c5f6..1d8cfdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -85,7 +86,11 @@ public class DiscoCache { /** */ private final IgniteProductVersion minNodeVer; + /** */ + private final AffinityTopologyVersion topVer; + /** + * @param topVer Topology version. * @param state Current cluster state. * @param loc Local node. * @param rmtNodes Remote nodes. @@ -101,6 +106,7 @@ public class DiscoCache { * @param alives Alive nodes. */ DiscoCache( + AffinityTopologyVersion topVer, DiscoveryDataClusterState state, ClusterNode loc, List<ClusterNode> rmtNodes, @@ -114,6 +120,7 @@ public class DiscoCache { Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Map<UUID, ClusterNode> nodeMap, Set<UUID> alives) { + this.topVer = topVer; this.state = state; this.loc = loc; this.rmtNodes = rmtNodes; @@ -143,6 +150,13 @@ public class DiscoCache { } /** + * @return Topology version. + */ + public AffinityTopologyVersion version() { + return topVer; + } + + /** * @return Minimum node version. */ public IgniteProductVersion minimumNodeVersion() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 347f6fe..1e34f0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -620,7 +620,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ChangeGlobalStateFinishMessage stateFinishMsg = null; if (locJoinEvt) { - discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer), + ctx.state().clusterState(), + locNode, + topSnapshot); transitionWaitFut = ctx.state().onLocalJoin(discoCache); } @@ -643,7 +646,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { else if (customMsg instanceof ChangeGlobalStateFinishMessage) { ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg); - discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + discoCache = createDiscoCache(topSnap.get().topVer, + ctx.state().clusterState(), + locNode, + topSnapshot); topSnap.set(new Snapshot(topSnap.get().topVer, discoCache)); @@ -691,7 +697,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // event notifications, since SPI notifies manager about all events from this listener. if (verChanged) { if (discoCache == null) - discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + discoCache = createDiscoCache( + nextTopVer, + ctx.state().clusterState(), + locNode, + topSnapshot); discoCacheHist.put(nextTopVer, discoCache); @@ -761,8 +771,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topHist.clear(); - topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, - createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet()))); + topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache( + AffinityTopologyVersion.ZERO, + ctx.state().clusterState(), + locNode, + Collections.<ClusterNode>emptySet()))); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -2170,12 +2183,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Called from discovery thread. * + * @param topVer Topology version. * @param state Current state. * @param loc Local node. * @param topSnapshot Topology snapshot. * @return Newly created discovery cache. */ - @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state, + @NotNull private DiscoCache createDiscoCache( + AffinityTopologyVersion topVer, + DiscoveryDataClusterState state, ClusterNode loc, Collection<ClusterNode> topSnapshot) { HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); @@ -2252,6 +2268,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } return new DiscoCache( + topVer, state, loc, Collections.unmodifiableList(rmtNodes), @@ -2394,7 +2411,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, node, - createDiscoCache(null, node, empty), + createDiscoCache(AffinityTopologyVersion.NONE, null, node, empty), empty, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index a59f5d7..bb27613 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -51,8 +51,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -1246,6 +1248,50 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param fut Exchange future. + * @param msg Message. + */ + public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) { + final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin(); + + if (F.isEmpty(affReq)) + return; + + final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); + + final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity(); + + assert !F.isEmpty(joinedNodeAff) : msg; + assert joinedNodeAff.size() >= affReq.size(); + + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { + if (affReq.contains(aff.groupId())) { + assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()); + + CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId()); + + assert affMsg != null; + + List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache()); + + // Calculate ideal assignments. + if (!aff.centralizedAffinityFunction()) + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + + aff.initialize(fut.topologyVersion(), assignments); + + CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); + + assert grp != null; + + grp.topology().initPartitions(fut); + } + } + }); + } + + /** * Called on exchange initiated by server node join. * * @param fut Exchange future. http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java index ee4ef45..726054d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -94,7 +94,7 @@ public class CacheGroupAffinityMessage implements Message { if (!cachesAff.containsKey(grpId)) { List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topVer); - cachesAff.put(grpId, new CacheGroupAffinityMessage(grpId, assign)); + cachesAff.put(grpId, new CacheGroupAffinityMessage(assign)); } } @@ -199,7 +199,7 @@ public class CacheGroupAffinityMessage implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 2; + return 1; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6da7876..2c9119f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1189,10 +1189,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param nodes Nodes. - * @param cachesAff Affinity if was requested by some nodes. + * @param joinedNodeAff Affinity if was requested by some nodes. * @throws IgniteCheckedException If failed. */ - private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinityMessage> cachesAff) + private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) throws IgniteCheckedException { boolean singleNode = nodes.size() == 1; @@ -1212,15 +1212,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (ClusterNode node : nodes) { GridDhtPartitionsFullMessage sndMsg = msg; - if (cachesAff != null) { + if (joinedNodeAff != null) { if (singleNode) - msg.cachesAffinity(cachesAff); + msg.joinedNodeAffinity(joinedNodeAff); else { GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id()); if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) { - if (msgWithAff == null) - msgWithAff = msg.copyWithAffinity(cachesAff); + if (msgWithAff == null) { + msgWithAff = msg.copy(); + + msgWithAff.joinedNodeAffinity(joinedNodeAff); + } sndMsg = msgWithAff; } @@ -1747,7 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - Map<Integer, CacheGroupAffinityMessage> cachesAff = null; + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { GridDhtPartitionsSingleMessage msg = e.getValue(); @@ -1770,10 +1773,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); if (affReq != null) { - cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, + joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, topologyVersion(), affReq, - cachesAff); + joinedNodeAff); UUID nodeId = e.getKey(); @@ -1880,7 +1883,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (!nodes.isEmpty()) - sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null); + sendAllPartitions(nodes, joinedNodeAff); onDone(exchangeId().topologyVersion(), err); } @@ -1919,19 +1922,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (n != null) { Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); - Collection<CacheGroupAffinityMessage> cachesAff = null; + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; if (affReq != null) { - Map<Integer, CacheGroupAffinityMessage> affMap = CacheGroupAffinityMessage.createAffinityMessages( + joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages( cctx, msg.exchangeId().topologyVersion(), affReq, null); - - cachesAff = affMap.values(); } - sendAllPartitions(F.asList(n), cachesAff); + sendAllPartitions(F.asList(n), joinedNodeAff); } } catch (IgniteCheckedException e) { @@ -2055,46 +2056,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin(); - if (localJoinExchange() && affReq != null) { - Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); - - Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity(); - - assert !F.isEmpty(cachesAff) : msg; - assert cachesAff.size() >= affReq.size(); - - int cnt = 0; - - for (CacheGroupAffinityMessage aff : cachesAff) { - if (affReq.contains(aff.groupId())) { - CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); - - assert grp != null : aff.groupId(); - assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion()); - - List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache); - - // Calculate ideal assignments. - if (!grp.affinity().centralizedAffinityFunction()) - grp.affinity().calculate(topologyVersion(), discoEvt, discoCache); - - grp.affinity().initialize(topologyVersion(), assignments); - - try { - grp.topology().initPartitions(this); - } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Interrupted when initialize local partitions."); - - return; - } - - cnt++; - } - } - - assert affReq.size() == cnt : cnt; - } + if (localJoinExchange() && affReq != null) + cctx.affinity().onLocalJoin(this, msg); updatePartitionFullMap(msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 1ea8757..a4258c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -456,15 +456,20 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } switch (writer.state()) { + case 5: + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); case 6: - if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) + if (!writer.writeByteArray("errsBytes", errsBytes)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("errsBytes", errsBytes)) + if (!writer.writeMap("joinedNodeAff", joinedNodeAff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -515,9 +520,16 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return false; switch (reader.state()) { + case 5: + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); case 6: - dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); + errsBytes = reader.readByteArray("errsBytes"); if (!reader.isLastRead()) return false; @@ -525,7 +537,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 7: - errsBytes = reader.readByteArray("errsBytes"); + joinedNodeAff = reader.readMap("joinedNodeAff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false;
