Repository: ignite Updated Branches: refs/heads/ignite-5578-locJoin dd44df9a7 -> 8fb60ffcf
ignite-5578 join Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056847c0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056847c0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056847c0 Branch: refs/heads/ignite-5578-locJoin Commit: 056847c091d091d678f6c96432d00e196115c3e7 Parents: dd44df9 Author: sboikov <[email protected]> Authored: Wed Jul 19 12:19:03 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 19 12:19:03 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 17 ----- .../preloader/CacheGroupAffinityMessage.java | 75 ++++++++------------ .../GridDhtPartitionsExchangeFuture.java | 4 +- .../preloader/GridDhtPartitionsFullMessage.java | 37 +++------- 4 files changed, 43 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index cfc3671..80121e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2256,23 +2256,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return super.values(); } - /** - * @param exchangeId Exchange ID. - * @return Future. - */ - public synchronized GridDhtPartitionsExchangeFuture find(GridDhtPartitionExchangeId exchangeId) { - ListIterator<GridDhtPartitionsExchangeFuture> it = listIterator(size() - 1); - - while (it.hasPrevious()) { - GridDhtPartitionsExchangeFuture fut0 = it.previous(); - - if (fut0.exchangeId().equals(exchangeId)) - return fut0; - } - - return null; - } - /** {@inheritDoc} */ @Override public synchronized String toString() { return S.toString(ExchangeFutureSet.class, this, super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java index 5cd5d26..ee4ef45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -45,9 +45,6 @@ public class CacheGroupAffinityMessage implements Message { private static final long serialVersionUID = 0L; /** */ - private int grpId; - - /** */ @GridDirectCollection(GridLongList.class) private List<GridLongList> assigns; @@ -59,12 +56,9 @@ public class CacheGroupAffinityMessage implements Message { } /** - * @param grpId Group ID. * @param assign0 Assignment. */ - private CacheGroupAffinityMessage(int grpId, List<List<ClusterNode>> assign0) { - this.grpId = grpId; - + private CacheGroupAffinityMessage(List<List<ClusterNode>> assign0) { assigns = new ArrayList<>(assign0.size()); for (int i = 0; i < assign0.size(); i++) { @@ -80,13 +74,6 @@ public class CacheGroupAffinityMessage implements Message { } /** - * @return Cache group ID. - */ - int groupId() { - return grpId; - } - - /** * @param cctx Context. * @param topVer Topology version. * @param affReq Cache group IDs. @@ -115,34 +102,46 @@ public class CacheGroupAffinityMessage implements Message { } /** + * @param assign Nodes orders. * @param nodesByOrder Nodes by order cache. * @param discoCache Discovery data cache. - * @return Assignments. + * @return Nodes list. */ - List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) { - List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size()); + public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) { + List<ClusterNode> assign0 = new ArrayList<>(assign.size()); - for (int p = 0; p < assigns.size(); p++) { - GridLongList assign = assigns.get(p); - List<ClusterNode> assign0 = new ArrayList<>(assign.size()); + for (int n = 0; n < assign.size(); n++) { + long order = assign.get(n); - for (int n = 0; n < assign.size(); n++) { - long order = assign.get(n); + ClusterNode affNode = nodesByOrder.get(order); - ClusterNode affNode = nodesByOrder.get(order); + if (affNode == null) { + affNode = discoCache.serverNodeByOrder(order); - if (affNode == null) { - affNode = discoCache.serverNodeByOrder(order); + assert affNode != null : "Failed to find node by order [order=" + order + + ", topVer=" + discoCache.version() + ']'; - assert affNode != null : order; + nodesByOrder.put(order, affNode); + } - nodesByOrder.put(order, affNode); - } + assign0.add(affNode); + } - assign0.add(affNode); - } + return assign0; + } - assignments0.add(assign0); + /** + * @param nodesByOrder Nodes by order cache. + * @param discoCache Discovery data cache. + * @return Assignments. + */ + public List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) { + List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size()); + + for (int p = 0; p < assigns.size(); p++) { + GridLongList assign = assigns.get(p); + + assignments0.add(toNodes(assign, nodesByOrder, discoCache)); } return assignments0; @@ -167,12 +166,6 @@ public class CacheGroupAffinityMessage implements Message { writer.incrementState(); - case 1: - if (!writer.writeInt("grpId", grpId)) - return false; - - writer.incrementState(); - } return true; @@ -194,14 +187,6 @@ public class CacheGroupAffinityMessage implements Message { reader.incrementState(); - case 1: - grpId = reader.readInt("grpId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(CacheGroupAffinityMessage.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index fa30fa2..6da7876 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -227,6 +227,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private ExchangeContext exchCtx; /** */ + @GridToStringExclude private FinishState finishState; /** @@ -873,7 +874,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (grp.isLocal() || cacheGroupStopping(grp.groupId())) continue; - if (!localJoinExchange() || grp.affinity().lastVersion().topologyVersion() > 0) + // It is possible affinity is not initialized yet if node joins to cluster. + if (grp.affinity().lastVersion().topologyVersion() > 0) grp.topology().beforeExchange(this, !centralizedAff); } http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index edc9c9e..1ea8757 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -103,8 +103,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private transient boolean compress; /** */ - @GridDirectCollection(CacheGroupAffinityMessage.class) - private Collection<CacheGroupAffinityMessage> cachesAff; + @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class) + private Map<Integer, CacheGroupAffinityMessage> joinedNodeAff; /** * Required by {@link Externalizable}. @@ -148,37 +148,32 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa cp.partsToReload = partsToReload; cp.partsToReloadBytes = partsToReloadBytes; cp.topVer = topVer; - cp.cachesAff = cachesAff; + cp.joinedNodeAff = joinedNodeAff; } /** - * @param cachesAff Affinity. * @return Message copy. */ - GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) { - assert !F.isEmpty(cachesAff) : cachesAff; - + GridDhtPartitionsFullMessage copy() { GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage(); copyStateTo(cp); - cp.cachesAff = cachesAff; - return cp; } /** - * @return Affinity. + * @return Caches affinity for joining nodes. */ - @Nullable Collection<CacheGroupAffinityMessage> cachesAffinity() { - return cachesAff; + @Nullable public Map<Integer, CacheGroupAffinityMessage> joinedNodeAffinity() { + return joinedNodeAff; } /** - * @param cachesAff Affinity. + * @param joinedNodeAff Caches affinity for joining nodes. */ - void cachesAffinity(Collection<CacheGroupAffinityMessage> cachesAff) { - this.cachesAff = cachesAff; + void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) { + this.joinedNodeAff = joinedNodeAff; } /** {@inheritDoc} */ @@ -461,11 +456,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } switch (writer.state()) { - case 5: - if (!writer.writeCollection("cachesAff", cachesAff, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); case 6: if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) @@ -525,13 +515,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return false; switch (reader.state()) { - case 5: - cachesAff = reader.readCollection("cachesAff", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); case 6: dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
