Repository: ignite Updated Branches: refs/heads/ignite-1093-2 6629fea44 -> 57064970f
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57064970 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57064970 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57064970 Branch: refs/heads/ignite-1093-2 Commit: 57064970fe04ccdeefed9800250bc8f0c2cb831f Parents: 6629fea Author: Anton Vinogradov <[email protected]> Authored: Fri Oct 16 09:23:50 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Oct 16 09:23:50 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 11 +++--- .../dht/preloader/GridDhtPartitionSupplier.java | 38 ++++++++++++-------- 2 files changed, 28 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/57064970/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 3caff69..b8aa2b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -398,9 +398,8 @@ public class GridDhtPartitionDemander { cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); -// if (log.isDebugEnabled()) -// log.debug( - U.log(log,"Requested rebalancing [from node=" + node.id() + ", listener index=" + + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]"); } @@ -500,9 +499,8 @@ public class GridDhtPartitionDemander { return; } -// if (log.isDebugEnabled()) -// log.debug( - U.log(log,"Received supply message: " + supply); + if (log.isDebugEnabled()) + log.debug("Received supply message: " + supply); // Check whether there were class loading errors on unmarshal if (supply.classError() != null) { @@ -982,7 +980,6 @@ public class GridDhtPartitionDemander { } /** - * * @param cancelled Is cancelled. */ private void checkIsDone(boolean cancelled) { http://git-wip-us.apache.org/repos/asf/ignite/blob/57064970/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index d68ce8b..00d1b9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -187,9 +188,22 @@ class GridDhtPartitionSupplier { assert d != null; assert id != null; - if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) { + AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion demTop = d.topologyVersion(); + + if (!cutTop.equals(demTop)) { + if (cutTop.compareTo(demTop) < 0) + // Resend demand message. + try { + cctx.io().sendOrderedMessage(cctx.localNode(), GridCachePartitionExchangeManager.rebalanceTopic(idx), + d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to resend partition supply message to local node: " + cctx.localNode().id()); + } + else if (log.isDebugEnabled()) + log.debug("Demand request cancelled [current=" + cctx.affinity().affinityTopologyVersion() + ", demanded=" + d.topologyVersion() + "]"); - U.log(log, "Demand request cancelled [current=" + cctx.affinity().affinityTopologyVersion() + ", demanded=" + d.topologyVersion() + "]"); return; } @@ -228,9 +242,8 @@ class GridDhtPartitionSupplier { maxBatchesCnt = 1; } else { -// if (log.isDebugEnabled()) -// log.debug - U.log(log, "Starting supplying rebalancing [cache=" + cctx.name() + + if (log.isDebugEnabled()) + log.debug("Starting supplying rebalancing [cache=" + cctx.name() + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -571,9 +584,8 @@ class GridDhtPartitionSupplier { reply(node, d, s, scId); -// if (log.isDebugEnabled()) -// log.debug( - U.log(log, "Finished supplying rebalancing [cache=" + cctx.name() + + if (log.isDebugEnabled()) + log.debug("Finished supplying rebalancing [cache=" + cctx.name() + ", fromNode=" + node.id() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -597,9 +609,8 @@ class GridDhtPartitionSupplier { throws IgniteCheckedException { try { -// if (log.isDebugEnabled()) -// log.debug - U.log(log, "Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); @@ -610,9 +621,8 @@ class GridDhtPartitionSupplier { return true; } catch (ClusterTopologyCheckedException ignore) { -// if (log.isDebugEnabled()) -// log.debug - U.log(log, "Failed to send partition supply message because node left grid: " + n.id()); + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); clearContext(scMap.remove(scId), log);
