ignite-5578 Affinity for local join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d47d9b78 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d47d9b78 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d47d9b78 Branch: refs/heads/ignite-5578 Commit: d47d9b785048c96b2b0ffd5b22046050a7c20c78 Parents: 0b9ba7a Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 12 10:30:44 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 12 10:30:44 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 5 +- .../GridDhtPartitionsExchangeFuture.java | 64 ++++++++++++++++++-- 2 files changed, 61 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d47d9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 6794c2b..45586c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1224,16 +1224,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param topVer Topology version. * @param grpId Cache group ID. * @return Affinity assignments. */ - public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer grpId) { + public GridAffinityAssignmentCache affinity(Integer grpId) { CacheGroupHolder grpHolder = grpHolders.get(grpId); assert grpHolder != null : grpId; - return grpHolder.affinity().assignments(topVer); + return grpHolder.affinity(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d47d9b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1039392..81b288c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -77,6 +78,7 @@ import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMess import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -1711,6 +1713,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + /** + * @param affReq Cache group IDs. + * @param cachesAff Optional already prepared affinity. + * @return Affinity. + */ private Map<Integer, CacheGroupAffinity> initCachesAffinity(Collection<Integer> affReq, @Nullable Map<Integer, CacheGroupAffinity> cachesAff) { assert !F.isEmpty(affReq); @@ -1720,7 +1727,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (Integer grpId : affReq) { if (!cachesAff.containsKey(grpId)) { - List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId); + List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topologyVersion()); cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign)); } @@ -1743,8 +1750,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!grp.isLocal()) { if (localJoinExchange() && grp.affinity().lastVersion().topologyVersion() == -1L) { List<List<ClusterNode>> aff = grp.affinity().calculate(topologyVersion(), - discoEvt, - discoCache); + discoEvt, + discoCache); grp.affinity().initialize(topologyVersion(), aff); } @@ -1756,9 +1763,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, CacheGroupAffinity> cachesAff = null; - for (GridDhtPartitionsSingleMessage msg : msgs.values()) { + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { + GridDhtPartitionsSingleMessage msg = e.getValue(); + + // Apply update counters after all single messages are received. for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = grp != null ? grp.topology() : @@ -1772,8 +1783,41 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); - if (affReq != null) + if (affReq != null) { cachesAff = initCachesAffinity(affReq, cachesAff); + + UUID nodeId = e.getKey(); + + // If node requested affinity on join and partitions are not created, then + // all affinity partitions should be in MOVING state. + for (Integer grpId : affReq) { + GridDhtPartitionMap partMap = msg.partitions().get(grpId); + + if (partMap == null || F.isEmpty(partMap.map())) { + if (partMap == null) { + partMap = new GridDhtPartitionMap(nodeId, + 1L, + topologyVersion(), + new GridPartitionStateMap(), + false); + } + + AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(topologyVersion()); + + for (int p = 0; p < aff.assignment().size(); p++) { + if (aff.getIds(p).contains(nodeId)) + partMap.put(p, GridDhtPartitionState.MOVING); + } + + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(grpId, this); + + top.update(exchId, partMap); + } + } + } } if (discoEvt.type() == EVT_NODE_JOINED) @@ -2039,6 +2083,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte grp.affinity().initialize(topologyVersion(), assignments0); + try { + grp.topology().initPartitions(this); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Interrupted when initialize local partitions."); + + return; + } + cnt++; } } @@ -2100,6 +2153,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * Updates partition map in all caches. * + * @param node Node sent message. * @param msg Partitions single message. */ private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleMessage msg) {