ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9ef68e3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9ef68e3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9ef68e3 Branch: refs/heads/ignite-5578 Commit: c9ef68e3f21aeec60ee2c8605f849b581a3d6ce4 Parents: 39cccec Author: sboikov <sboi...@gridgain.com> Authored: Thu Jul 13 14:59:35 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jul 13 17:35:17 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 138 ++++++------ .../processors/cache/ExchangeContext.java | 50 ++--- .../cache/ExchangeDiscoveryEvents.java | 52 ++++- .../GridCachePartitionExchangeManager.java | 55 ++--- .../GridDhtPartitionsExchangeFuture.java | 219 +++++++++++-------- .../preloader/GridDhtPartitionsFullMessage.java | 4 + .../CacheExchangeCoalescingTest.java | 73 ------- .../distributed/CacheExchangeMergeTest.java | 73 +++++++ 8 files changed, 361 insertions(+), 303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index a31ab1c..4ea61a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -1232,38 +1233,45 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param crd Coordinator flag. - * @throws IgniteCheckedException If failed. + * @param grpId Cache group ID. + * @return Affinity assignments. */ - public void onLocalJoin(boolean crd) throws IgniteCheckedException { + public GridAffinityAssignmentCache affinity(Integer grpId) { + CacheGroupHolder grpHolder = grpHolders.get(grpId); + + assert grpHolder != null : debugGroupName(grpId); + return grpHolder.affinity(); } - public void processDiscoveryEvents(ExchangeDiscoveryEvents evts) { - AffinityTopologyVersion topVer = evts.topologyVersion(); + public Map<Integer, Map<Integer, List<UUID>>> onTopologyChange(GridDhtPartitionsExchangeFuture fut, + boolean crd) + throws IgniteCheckedException + { + final ExchangeDiscoveryEvents evts = fut.context().events(); + + assert evts.serverJoin() || evts.serverLeft(); if (evts.serverLeft()) { + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + AffinityTopologyVersion topVer = evts.topologyVersion(); - } - else if (evts.serverJoin()) { + CacheGroupHolder cache = groupHolder(topVer, desc); - } - else { + cache.affinity().calculate(topVer, evts.event(), evts.discoveryCache()); + } + }); + return initAffinityOnNodeLeft0(evts.topologyVersion(), fut); } - } - - - /** - * @param grpId Cache group ID. - * @return Affinity assignments. - */ - public GridAffinityAssignmentCache affinity(Integer grpId) { - CacheGroupHolder grpHolder = grpHolders.get(grpId); + else { + WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd); - assert grpHolder != null : debugGroupName(grpId); + setWaitRebalanceInfo(waitRebalanceInfo, evts.topologyVersion(), crd); - return grpHolder.affinity(); + return null; + } } /** @@ -1299,25 +1307,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else fetchAffinityOnJoin(fut); } - else { - waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(), - fut.discoveryEvent(), - fut.discoCache(), - crd); - } + else + waitRebalanceInfo = initAffinityOnNodeJoin(fut.context().events(), crd); - synchronized (mux) { - affCalcVer = fut.topologyVersion(); + setWaitRebalanceInfo(waitRebalanceInfo, fut.topologyVersion(), crd); + } - this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; + private void setWaitRebalanceInfo(WaitRebalanceInfo waitRebalanceInfo, AffinityTopologyVersion topVer, boolean crd) { + affCalcVer = topVer; - WaitRebalanceInfo info = this.waitInfo; + this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; - if (crd) { - if (log.isDebugEnabled()) { - log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() + - ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); - } + WaitRebalanceInfo info = this.waitInfo; + + if (crd) { + if (log.isDebugEnabled()) { + log.debug("Computed new affinity after node join [topVer=" + topVer + + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } } @@ -1654,17 +1660,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param topVer Topology version. - * @param evt Discovery event. - * @param discoCache Discovery data cache. * @param crd Coordinator flag. * @throws IgniteCheckedException If failed. * @return Rabalance info. */ - @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final AffinityTopologyVersion topVer, - final DiscoveryEvent evt, - final DiscoCache discoCache, - boolean crd) + @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final ExchangeDiscoveryEvents evts, boolean crd) throws IgniteCheckedException { final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); @@ -1675,23 +1675,27 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean latePrimary = grp.rebalanceEnabled(); - initAffinityOnNodeJoin(topVer, evt, discoCache, grp.affinity(), null, latePrimary, affCache); + initAffinityOnNodeJoin(evts, + evts.groupAddedOnExchange(grp.groupId(), grp.receivedFrom()), + grp.affinity(), + null, + latePrimary, + affCache); } return null; } else { - final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); + final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.topologyVersion()); forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { - CacheGroupHolder cache = groupHolder(topVer, desc); + CacheGroupHolder cache = groupHolder(evts.topologyVersion(), desc); boolean latePrimary = cache.rebalanceEnabled; - initAffinityOnNodeJoin(topVer, - evt, - discoCache, + initAffinityOnNodeJoin(evts, + evts.groupAddedOnExchange(desc.groupId(), desc.receivedFrom()), cache.affinity(), waitRebalanceInfo, latePrimary, @@ -1704,24 +1708,33 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param topVer Topology version. - * @param evt Discovery event. - * @param discoCache Discovery data cache. * @param aff Affinity. * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. * @param affCache Already calculated assignments (to reduce data stored in history). * @throws IgniteCheckedException If failed. */ - private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer, - DiscoveryEvent evt, - DiscoCache discoCache, + private void initAffinityOnNodeJoin( + ExchangeDiscoveryEvents evts, + boolean addedOnExchnage, GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, boolean latePrimary, Map<Object, List<List<ClusterNode>>> affCache) throws IgniteCheckedException { + if (addedOnExchnage) { + if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) { + List<List<ClusterNode>> newAff = aff.calculate(evts.topologyVersion(), + evts.event(), + evts.discoveryCache()); + + aff.initialize(evts.topologyVersion(), newAff); + } + + return; + } + AffinityTopologyVersion affTopVer = aff.lastVersion(); assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + @@ -1731,7 +1744,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, discoCache); + List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.event(), evts.discoveryCache()); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { @@ -1743,7 +1756,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ClusterNode newPrimary = newNodes.size() > 0 ? newNodes.get(0) : null; if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) { - assert cctx.discovery().node(topVer, curPrimary.id()) != null : curPrimary; + assert cctx.discovery().node(evts.topologyVersion(), curPrimary.id()) != null : curPrimary; List<ClusterNode> nodes0 = latePrimaryAssignment(aff, p, @@ -1762,7 +1775,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (newAssignment == null) newAssignment = idealAssignment; - aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache)); + aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); } /** @@ -1834,7 +1847,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> initFut) { try { - resFut.onDone(initAffinityOnNodeLeft0(fut)); + resFut.onDone(initAffinityOnNodeLeft0(fut.topologyVersion(), fut)); } catch (IgniteCheckedException e) { resFut.onDone(e); @@ -1845,7 +1858,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return resFut; } else - return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut)); + return new GridFinishedFuture<>(initAffinityOnNodeLeft0(fut.topologyVersion(), fut)); } /** @@ -1853,10 +1866,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return Affinity assignment. * @throws IgniteCheckedException If failed. */ - private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) + private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final AffinityTopologyVersion topVer, + final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { - final AffinityTopologyVersion topVer = fut.topologyVersion(); - final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); final Collection<ClusterNode> aliveNodes = cctx.discovery().nodes(topVer); @@ -1865,7 +1877,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { - CacheGroupHolder grpHolder = groupHolder(fut.topologyVersion(), desc); + CacheGroupHolder grpHolder = groupHolder(topVer, desc); if (!grpHolder.rebalanceEnabled) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index eeb7b23..1d7b73a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -17,14 +17,9 @@ package org.apache.ignite.internal.processors.cache; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Set; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.jetbrains.annotations.Nullable; @@ -39,24 +34,25 @@ public class ExchangeContext { private boolean fetchAffOnJoin; /** */ - private final boolean coalescing; + private final boolean merge; /** */ - private AffinityTopologyVersion resTopVer; - - /** */ - private final Map<Integer, List<List<ClusterNode>>> affMap = new HashMap<>(); + private final ExchangeDiscoveryEvents evts; /** * @param protocolVer Protocol version. - * @param topVer Topology version. + * @param fut Exchange future. */ - public ExchangeContext(int protocolVer, AffinityTopologyVersion topVer) { + public ExchangeContext(int protocolVer, GridDhtPartitionsExchangeFuture fut) { fetchAffOnJoin = protocolVer == 1; - coalescing = protocolVer > 1; + merge = protocolVer > 1; + + evts = new ExchangeDiscoveryEvents(fut); + } - this.resTopVer = topVer; + public ExchangeDiscoveryEvents events() { + return evts; } /** @@ -84,27 +80,7 @@ public class ExchangeContext { return requestGrpsAffOnJoin; } - public boolean coalescing() { - return coalescing; - } - - public List<List<ClusterNode>> activeAffinity(GridCacheSharedContext cctx, GridAffinityAssignmentCache aff) { - List<List<ClusterNode>> assignment = affMap.get(aff.groupId()); - - if (assignment != null) - return assignment; - - AffinityTopologyVersion affTopVer = aff.lastVersion(); - - assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + - ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']'; - - List<List<ClusterNode>> curAff = aff.assignments(affTopVer); - - assert aff.idealAssignment() != null : "Previous assignment is not available."; - - affMap.put(aff.groupId(), curAff); - - return curAff; + public boolean canMergeExchanges() { + return merge; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java index fced92e..7d3e256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java @@ -17,13 +17,19 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; +import static com.sun.corba.se.impl.util.RepositoryId.cache; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -39,6 +45,12 @@ public class ExchangeDiscoveryEvents { private DiscoCache discoCache; /** */ + private DiscoveryEvent evt; + + /** */ + private List<DiscoveryEvent> evts = new ArrayList<>(); + + /** */ private boolean srvJoin; /** */ @@ -47,18 +59,36 @@ public class ExchangeDiscoveryEvents { /** * @param fut Future. */ - void init(GridDhtPartitionsExchangeFuture fut) { - topVer = fut.topologyVersion(); - discoCache = fut.discoCache(); + ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) { + addEvent(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + } - ClusterNode node = fut.discoveryEvent().eventNode(); + boolean groupAddedOnExchange(int grpId, UUID rcvdFrom) { + for (DiscoveryEvent evt : evts) { + if (evt.type() == EVT_NODE_JOINED && rcvdFrom.equals(evt.eventNode().id())) + return true; + } - if (fut.discoveryEvent().type()== EVT_NODE_JOINED) - srvJoin = !CU.clientNode(node); - else { - assert fut.discoveryEvent().type() == EVT_NODE_LEFT || fut.discoveryEvent().type() == EVT_NODE_FAILED; + return false; + } - srvLeft = !CU.clientNode(node); + void addEvent(AffinityTopologyVersion topVer, DiscoveryEvent evt, DiscoCache cache) { + evts.add(evt); + + this.topVer = topVer; + this.evt = evt; + this.discoCache = cache; + + ClusterNode node = evt.eventNode(); + + if (!CU.clientNode(node)) { + if (evt.type()== EVT_NODE_JOINED) + srvJoin = true; + else { + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + srvLeft = !CU.clientNode(node); + } } } @@ -66,6 +96,10 @@ public class ExchangeDiscoveryEvents { return discoCache; } + DiscoveryEvent event() { + return evt; + } + AffinityTopologyVersion topologyVersion() { return topVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 135b771..4e42bf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1738,28 +1738,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx); } - private boolean supportsCoalescing(ClusterNode node) { + private boolean supportsMergeExchanges(ClusterNode node) { return exchangeProtocolVersion(node.version()) > 1; } /** */ - private volatile AffinityTopologyVersion coalesceTestWaitVer; + private volatile AffinityTopologyVersion exchMergeTestWaitVer; /** - * @param coalesceTestWaitVer + * For testing only. + * + * @param exchMergeTestWaitVer Version to wait for. */ - public void coalesceTestWaitVersion(AffinityTopologyVersion coalesceTestWaitVer) { - this.coalesceTestWaitVer = coalesceTestWaitVer; + public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) { + this.exchMergeTestWaitVer = exchMergeTestWaitVer; } - public ExchangeDiscoveryEvents coalesceExchanges(GridDhtPartitionsExchangeFuture curFut) { - ExchangeDiscoveryEvents evts = null; - - AffinityTopologyVersion coalesceTestWaitVer = this.coalesceTestWaitVer; + public boolean mergeExchanges(GridDhtPartitionsExchangeFuture curFut) { + AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer; - if (coalesceTestWaitVer != null) { + if (exchMergeTestWaitVer != null) { log.info("Coalesce test, waiting for version [exch=" + curFut.topologyVersion() + - ", waitVer=" + coalesceTestWaitVer + ']'); + ", waitVer=" + exchMergeTestWaitVer + ']'); long end = U.currentTimeMillis() + 10_000; @@ -1770,8 +1770,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (task instanceof GridDhtPartitionsExchangeFuture) { GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; - if (coalesceTestWaitVer.equals(fut.topologyVersion())) { - log.info("Coalesce test, found awaited version: " + coalesceTestWaitVer); + if (exchMergeTestWaitVer.equals(fut.topologyVersion())) { + log.info("Coalesce test, found awaited version: " + exchMergeTestWaitVer); found = true; @@ -1793,40 +1793,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { - if (task instanceof GridDhtPartitionsExchangeFuture) { - GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; + synchronized (curFut) { + int awaited = 0; - int evtType = fut.discoveryEvent().type(); + for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + if (task instanceof GridDhtPartitionsExchangeFuture) { + GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; - if (evtType == EVT_NODE_JOINED) { DiscoveryEvent evt = fut.discoveryEvent(); ClusterNode node = evt.eventNode(); - if (!supportsCoalescing(node)) + if (!supportsMergeExchanges(node)) break; - fut.mergeWithFuture(curFut); - - if (evts == null) - evts = new ExchangeDiscoveryEvents(); - - evts.init(fut); + if (evt.type() == EVT_NODE_JOINED && !CU.clientNode(node)) + fut.mergeServerJoinExchange(curFut); exchWorker.futQ.remove(fut); } else break; -// else if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) { -// -// } } - else - break; - } - return evts; + return awaited == 0; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1ec3d73..463e330 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -63,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.ExchangeContext; +import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -210,6 +212,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>(); /** */ + private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs; + + /** */ + private int awaitMergedMsgs; + + /** */ @GridToStringExclude private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); @@ -444,7 +452,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this; exchCtx = new ExchangeContext(cctx.exchange().exchangeProtocolVersion(discoCache.minimumNodeVersion()), - topologyVersion()); + this); try { discoCache.updateAlives(cctx.discovery()); @@ -497,51 +505,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } else { - if (exchCtx.coalescing()) { -// if (discoEvt.type() == EVT_NODE_JOINED) { -// if (discoEvt.eventNode().isLocal()) { -// localJoin(); -// -// if (crdNode) { -// exchange = ExchangeType.NONE; -// } -// else -// sendLocalJoinMessage(crd); -// } -// else { -// if (CU.clientNode(discoEvt.eventNode())) { -// onClientNodeEvent(crdNode); -// -// exchange = ExchangeType.NONE; -// } -// else { -// if (cctx.kernalContext().clientNode()) -// exchange = ExchangeType.CLIENT; -// else { -// -// } -// } -// } -// } -// else { -// -// } - } - else { - if (discoEvt.type() == EVT_NODE_JOINED) { - if (!discoEvt.eventNode().isLocal()) { - Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches( - discoEvt.eventNode().id(), - topVer); + if (discoEvt.type() == EVT_NODE_JOINED) { + if (!discoEvt.eventNode().isLocal()) { + Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches( + discoEvt.eventNode().id(), + topVer); - cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); - } - else - initCachesOnLocalJoin(); + cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); } + else + initCachesOnLocalJoin(); + } - exchange = CU.clientNode(discoEvt.eventNode()) ? - onClientNodeEvent(crdNode) : + if (exchCtx.canMergeExchanges()) { + if (cctx.kernalContext().clientNode() || CU.clientNode(discoEvt.eventNode())) + exchange = onClientNodeEvent(crdNode); + else + exchange = ExchangeType.ALL_2; + } + else { + exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) : onServerNodeEvent(crdNode); } } @@ -556,6 +539,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte break; } + case ALL_2: { + distributedExchange2(); + + break; + } + case CLIENT: { initTopologies(); @@ -874,6 +863,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @throws IgniteCheckedException If failed. */ + private void distributedExchange2() throws IgniteCheckedException { + if (crd.isLocal()) { + if (remaining.isEmpty()) + onAllReceived(); + } + else + sendPartitions(crd); + + initDone(); + } + + /** + * @throws IgniteCheckedException If failed. + */ private void distributedExchange() throws IgniteCheckedException { assert crd != null; @@ -914,11 +917,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } cctx.database().beforeExchange(this); -// -// ExchangeDiscoveryEvents mergedEvts = null; -// -// if (crd.isLocal()) -// mergedEvts = cctx.exchange().coalesceExchanges(this); if (crd.isLocal()) { if (remaining.isEmpty()) @@ -1453,59 +1451,63 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private GridDhtPartitionsExchangeFuture mergedWith; /** */ - private List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs; + private GridDhtPartitionsSingleMessage pendingSingleMsg; + + /** */ + private Map<ClusterNode, GridDhtPartitionsSingleMessage> pendingClientMsgs; + + private void addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + if (mergedJoinExchMsgs == null) + mergedJoinExchMsgs = new LinkedHashMap<>(); + + if (msg != null) + mergedJoinExchMsgs.put(nodeId, msg); + else { + if (cctx.discovery().alive(nodeId)) + awaitMergedMsgs++; + else + mergedJoinExchMsgs.put(nodeId, null); + } + } /** * @param fut Current exchange to merge with. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) { + public void mergeServerJoinExchange(final GridDhtPartitionsExchangeFuture fut) { log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion() + ']'); - List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs = null; - synchronized (this) { - synchronized (fut) { - assert !isDone(); - assert !initFut.isDone(); - assert mergedWith == null; + assert !isDone(); + assert !initFut.isDone(); + assert mergedWith == null; + assert !CU.clientNode(discoEvt.eventNode()) : discoEvt; - mergedWith = fut; + mergedWith = fut; - if (this.pendingMsgs != null) { - pendingMsgs = this.pendingMsgs; + fut.addMergedJoinExchange(discoEvt.eventNode().id(), pendingSingleMsg); - T2<UUID, GridDhtPartitionsSingleMessage> joinedSrvMsg = null; + // TODO 5578 client messages. + } + } - if (discoEvt.type() == EVT_NODE_JOINED && !CU.clientNode(discoEvt.eventNode())) { - for (Iterator<T2<UUID, GridDhtPartitionsSingleMessage>> it = pendingMsgs.iterator(); it.hasNext();) { - T2<UUID, GridDhtPartitionsSingleMessage> msg = it.next(); + void onReceiveMerged(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { + boolean done = false; - if (msg.get1().equals(discoEvt.eventNode().id())) { - joinedSrvMsg = msg; + synchronized (this) { + if (mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id())) { + mergedJoinExchMsgs.put(node.id(), msg); - it.remove(); + assert awaitMergedMsgs > 0 : awaitMergedMsgs; - break; - } - } + awaitMergedMsgs--; - if (pendingMsgs.isEmpty()) - pendingMsgs = null; - } - } + done = awaitMergedMsgs == 0; } } - if (pendingMsgs != null) { - final List<T2<UUID, GridDhtPartitionsSingleMessage>> pendingMsgs0 = pendingMsgs; + if (done) { - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut0) { - for (T2<UUID, GridDhtPartitionsSingleMessage> msg : pendingMsgs0) - fut.processSingleMessage(msg.get1(), msg.get2()); - } - }); } } @@ -1516,7 +1518,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { assert msg != null; assert exchId.equals(msg.exchangeId()) : msg; - assert msg.lastVersion() != null : msg; if (isDone()) { if (log.isDebugEnabled()) @@ -1537,15 +1538,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (mergedWith != null) mergedWith0 = mergedWith; else { - if (pendingMsgs == null) - pendingMsgs = new ArrayList<>(); - - pendingMsgs.add(new T2<>(node.id(), msg)); + if (msg.client()) { + assert false; + } + else if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) + pendingSingleMsg = msg; } } if (mergedWith0 != null) { - mergedWith0.onReceive(node, msg); + mergedWith0.onReceiveMerged(node, msg); return; } @@ -1857,6 +1859,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + if (exchCtx.canMergeExchanges()) { + cctx.exchange().mergeExchanges(this); + + cctx.affinity().onTopologyChange(this, true); + + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; + + grp.topology().beforeExchange(this, true); + } + } + Map<Integer, CacheGroupAffinityMessage> cachesAff = null; for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { @@ -2160,7 +2175,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } - finishState = new FinishState(crd.id()); + finishState = new FinishState(crd.id(), msg.resultTopologyVersion()); + } + + if (exchCtx.canMergeExchanges()) { + try { + onServerNodeEvent(true); + + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; + + grp.topology().beforeExchange(this, true); + } + } + catch (IgniteCheckedException e) { + // TODO 5578. + U.error(log, "Failed: " + e, e); + } } Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin(); @@ -2568,8 +2600,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte enum ExchangeType { /** */ CLIENT, + /** */ ALL, + + /** */ + ALL_2, + /** */ NONE } @@ -2680,11 +2717,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private final UUID crdId; + /** */ + private final AffinityTopologyVersion topVer; + /** * @param crdId Coordinator node. */ - FinishState(UUID crdId) { + FinishState(UUID crdId, AffinityTopologyVersion topVer) { this.crdId = crdId; + this.topVer = topVer; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 4c79c3b..8a5dbbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -154,6 +154,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa cp.cachesAff = cachesAff; } + AffinityTopologyVersion resultTopologyVersion() { + return resTopVer; + } + /** * @param cachesAff Affinity. * @return Message copy. http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java deleted file mode 100644 index dbd3971..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class CacheExchangeCoalescingTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private boolean client; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - - cfg.setClientMode(client); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentJoin1() throws Exception { - IgniteEx srv0 = startGrid(0); - - srv0.context().cache().context().exchange().coalesceTestWaitVersion(new AffinityTopologyVersion(3, 0)); - - final AtomicInteger idx = new AtomicInteger(1); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(idx.getAndIncrement()); - - return null; - } - }, 2, "start-node"); - - fut.get(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ef68e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java new file mode 100644 index 0000000..ef8a1da --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheExchangeMergeTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentJoin1() throws Exception { + IgniteEx srv0 = startGrid(0); + + srv0.context().cache().context().exchange().mergeExchangesTestWaitVersion(new AffinityTopologyVersion(3, 0)); + + final AtomicInteger idx = new AtomicInteger(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(idx.getAndIncrement()); + + return null; + } + }, 2, "start-node"); + + fut.get(); + } +}