1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9abfc606 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9abfc606 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9abfc606 Branch: refs/heads/ignite-1093-2 Commit: 9abfc60694ddb2cc4dc11cd41124c79c09d083dd Parents: e1651dd Author: Anton Vinogradov <a...@apache.org> Authored: Mon Sep 28 14:39:52 2015 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Mon Sep 28 14:39:52 2015 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionDemandMessage.java | 7 ++ .../dht/preloader/GridDhtPartitionDemander.java | 68 ++++++++++++-------- .../dht/preloader/GridDhtPartitionSupplier.java | 7 +- 3 files changed, 53 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 06ac54b..6c60930 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -123,6 +123,13 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { } /** + * @param updateSeq Update sequence. + */ + void updateSequence(long updateSeq) { + this.updateSeq = updateSeq; + } + + /** * @return Update sequence. */ long updateSequence() { http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 345e3bd..5d4db40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -64,7 +64,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -110,6 +110,9 @@ public class GridDhtPartitionDemander { /** Demand lock. */ private final ReadWriteLock demandLock; + /** Rebalancing iteration counter. */ + private long updateSeq = 0; + /** * @param cctx Cctx. * @param demandLock Demand lock. @@ -194,7 +197,10 @@ public class GridDhtPartitionDemander { * @return {@code True} if topology changed. */ private boolean topologyChanged(SyncFuture fut) { - return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || fut != syncFut; + return + !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed. + fut != syncFut || // Same topology, but dummy exchange forced because of missing partitions. + cctx.shared().exchange().hasPendingExchange(); // New topology pending. } /** @@ -256,20 +262,20 @@ public class GridDhtPartitionDemander { final SyncFuture oldFut = syncFut; - if (!oldFut.isDummy() && assigns.topologyVersion().compareTo(oldFut.topologyVersion()) < 0) { - U.log(log, "Skipping obsolete (dummy) exchange. [top=" + assigns.topologyVersion() + "]"); + if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual topology. + U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion() + "]"); return; } - final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy()); + final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy(), ++updateSeq); if (!oldFut.isDummy()) oldFut.cancel(); else - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> future) { - oldFut.onDone(); + fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> future) { + oldFut.onDone(fut.result()); } }); @@ -385,13 +391,13 @@ public class GridDhtPartitionDemander { final CacheConfiguration cfg = cctx.config(); - U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + - ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]"); - //Check remote node rebalancing API version. if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) { - fut.appendPartitions(node.id(), d.partitions(), d.updateSequence()); + U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + + ", topology=" + d.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); + + fut.appendPartitions(node.id(), d.partitions()); int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); @@ -415,6 +421,7 @@ public class GridDhtPartitionDemander { GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt)); + initD.updateSequence(fut.updateSeq); try { if (!topologyChanged(fut)) { @@ -442,9 +449,13 @@ public class GridDhtPartitionDemander { } } else { + U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]"); + DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); - fut.appendPartitions(node.id(), d.partitions(), d.updateSequence()); + fut.appendPartitions(node.id(), d.partitions()); dw.run(node, d); } @@ -516,7 +527,9 @@ public class GridDhtPartitionDemander { assert node != null; - if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut) || !fut.isActual(id, supply.updateSequence())) + if (!fut.topologyVersion().equals(topVer) || // Current future based on another topology. + topologyChanged(fut) || // Topology already changed (for current future) or new topology pending. + !fut.isActual(supply.updateSequence())) // Current future have same topology, but another update sequence. return; if (log.isDebugEnabled()) @@ -761,7 +774,7 @@ public class GridDhtPartitionDemander { private final IgniteLogger log; /** Remaining. T3: startTime, partitions, updateSequence */ - private final Map<UUID, T3<Long, Collection<Integer>, Long>> remaining = new HashMap<>(); + private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>(); /** Missed. */ private final Map<UUID, Collection<Integer>> missed = new HashMap<>(); @@ -776,6 +789,9 @@ public class GridDhtPartitionDemander { /** Topology version. */ private final AffinityTopologyVersion topVer; + /** Unique (per demander) sequence id. */ + private final long updateSeq; + /** * @param assigns Assigns. * @param cctx Context. @@ -785,7 +801,8 @@ public class GridDhtPartitionDemander { SyncFuture(GridDhtPreloaderAssignments assigns, GridCacheContext<?, ?> cctx, IgniteLogger log, - boolean sentStopEvnt) { + boolean sentStopEvnt, + long updateSeq) { assert assigns != null; this.exchFut = assigns.exchangeFuture(); @@ -793,6 +810,7 @@ public class GridDhtPartitionDemander { this.cctx = cctx; this.log = log; this.sendStoppedEvnt = sentStopEvnt; + this.updateSeq = updateSeq; if (assigns != null) cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen( @@ -812,6 +830,7 @@ public class GridDhtPartitionDemander { this.cctx = null; this.log = null; this.sendStoppedEvnt = false; + this.updateSeq = -1; } /** @@ -822,14 +841,11 @@ public class GridDhtPartitionDemander { } /** - * @param nodeId Node id. * @param updateSeq Update sequence. * @return true in case future created for specified updateSeq, false in other case. */ - private boolean isActual(UUID nodeId, long updateSeq) { - T3<Long, Collection<Integer>, Long> t = remaining.get(nodeId); - - return t != null ? t.get3().equals(updateSeq) : false; + private boolean isActual(long updateSeq) { + return this.updateSeq == updateSeq; } /** @@ -843,11 +859,11 @@ public class GridDhtPartitionDemander { * @param nodeId Node id. * @param parts Parts. */ - private void appendPartitions(UUID nodeId, Collection<Integer> parts, long updateSeq) { + private void appendPartitions(UUID nodeId, Collection<Integer> parts) { lock.lock(); try { - remaining.put(nodeId, new T3<>(U.currentTimeMillis(), parts, updateSeq)); + remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts)); } finally { lock.unlock(); @@ -1027,10 +1043,10 @@ public class GridDhtPartitionDemander { if (!m.isEmpty()) { U.log(log, ("Reassigning partitions that were missed: " + m)); - cctx.shared().exchange().forceDummyExchange(true, exchFut); - onDone(false); //Finished but has missed partitions and forced dummy exchange + cctx.shared().exchange().forceDummyExchange(true, exchFut); + return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9abfc606/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index e23a50b..81e2fa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -105,7 +105,7 @@ class GridDhtPartitionSupplier { SupplyContext sc = entry.getValue(); - if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) + if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) clearContext(scMap, t, sc, log); } } @@ -128,7 +128,7 @@ class GridDhtPartitionSupplier { } }; - cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED); + cctx.events().addListener(lsnr, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED); startOldListeners(); } @@ -532,7 +532,8 @@ class GridDhtPartitionSupplier { if (log.isDebugEnabled()) log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) + if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()) || // Topology already changed. + cctx.shared().exchange().hasPendingExchange()) // New topology pending. return true; cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());