Ignite-1093 Logging & Backward compatibility failover fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/67f88584 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/67f88584 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/67f88584 Branch: refs/heads/ignite-1816 Commit: 67f88584a4ab330bbda956b3d0d830468d28920f Parents: 37cafb6 Author: Anton Vinogradov <a...@apache.org> Authored: Tue Nov 10 16:14:15 2015 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Tue Nov 10 16:14:15 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 34 +++++++------------- .../dht/preloader/GridDhtPartitionDemander.java | 25 ++++++++++++-- 2 files changed, 34 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 479a0b6..5b4fee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -617,13 +617,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @return {@code True} if topology has changed. - */ - public boolean topologyChanged() { - return exchWorker.topologyChanged(); - } - - /** * @param exchFut Exchange future. * @param reassign Dummy reassign flag. */ @@ -673,7 +666,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); - Collection<ClusterNode> rmts = null; + Collection<ClusterNode> rmts; // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -1362,7 +1355,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (marshR != null || !rebalanceQ.isEmpty()) { if (futQ.isEmpty()) { - U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]"); + U.log(log, "Rebalancing required" + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); if (marshR != null) try { @@ -1404,13 +1399,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } }, /*system pool*/ true); } - else { - U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); - } - } - else { - U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); + else + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); } + else + U.log(log, "Skipping rebalancing (nothing scheduled) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().node().id() + ']'); } } catch (IgniteInterruptedCheckedException e) { @@ -1425,13 +1422,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } } - - /** - * @return {@code True} if another exchange future has been queued up. - */ - boolean topologyChanged() { - return !futQ.isEmpty() || busy; - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 29ca5f4..40d3dc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -114,6 +114,10 @@ public class GridDhtPartitionDemander { @Deprecated//Backward compatibility. To be removed in future. private final AtomicInteger dmIdx = new AtomicInteger(); + /** DemandWorker. */ + @Deprecated//Backward compatibility. To be removed in future. + private volatile DemandWorker worker; + /** Cached rebalance topics. */ private final Map<Integer, Object> rebalanceTopics; @@ -166,6 +170,11 @@ public class GridDhtPartitionDemander { rebalanceFut.onDone(false); } + DemandWorker dw = worker; + + if (dw != null) + dw.cancel(); + lastExchangeFut = null; lastTimeoutObj.set(null); @@ -426,9 +435,9 @@ public class GridDhtPartitionDemander { d.timeout(cctx.config().getRebalanceTimeout()); d.workerId(0);//old api support. - DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); + worker = new DemandWorker(dmIdx.incrementAndGet(), fut); - dw.run(node, d); + worker.run(node, d); } } @@ -1137,6 +1146,13 @@ public class GridDhtPartitionDemander { return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); } + /** */ + public void cancel() { + msgQ.clear(); + + msgQ.offer(new SupplyMessage(null, null)); + } + /** * @param node Node to demand from. * @param topVer Topology version. @@ -1159,7 +1175,7 @@ public class GridDhtPartitionDemander { d.topic(topic(cntr)); d.workerId(id); - if (topologyChanged(fut)) + if (fut.isDone() || topologyChanged(fut)) return; cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { @@ -1228,6 +1244,9 @@ public class GridDhtPartitionDemander { continue; // While. } + if (s.senderId() == null) + return; // Stopping now. + // Check that message was received from expected node. if (!s.senderId().equals(node.id())) { U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +