Ignite-1093 Logging & Backward compatibility failover fixes.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/67f88584
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/67f88584
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/67f88584

Branch: refs/heads/ignite-1816
Commit: 67f88584a4ab330bbda956b3d0d830468d28920f
Parents: 37cafb6
Author: Anton Vinogradov <a...@apache.org>
Authored: Tue Nov 10 16:14:15 2015 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Tue Nov 10 16:14:15 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 34 +++++++-------------
 .../dht/preloader/GridDhtPartitionDemander.java | 25 ++++++++++++--
 2 files changed, 34 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 479a0b6..5b4fee3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -617,13 +617,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @return {@code True} if topology has changed.
-     */
-    public boolean topologyChanged() {
-        return exchWorker.topologyChanged();
-    }
-
-    /**
      * @param exchFut Exchange future.
      * @param reassign Dummy reassign flag.
      */
@@ -673,7 +666,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Refreshing partitions [oldest=" + oldest.id() + ", 
loc=" + cctx.localNodeId() + ']');
 
-        Collection<ClusterNode> rmts = null;
+        Collection<ClusterNode> rmts;
 
         // If this is the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
@@ -1362,7 +1355,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                         if (marshR != null || !rebalanceQ.isEmpty()) {
                             if (futQ.isEmpty()) {
-                                U.log(log, "Starting caches rebalancing [top=" 
+ exchFut.topologyVersion() + "]");
+                                U.log(log, "Rebalancing required" +
+                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
+                                    ", node=" + 
exchFut.discoveryEvent().node().id() + ']');
 
                                 if (marshR != null)
                                     try {
@@ -1404,13 +1399,15 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                     }
                                 }, /*system pool*/ true);
                             }
-                            else {
-                                U.log(log, "Obsolete exchange, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
-                            }
-                        }
-                        else {
-                            U.log(log, "Nothing scheduled, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
+                            else
+                                U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
+                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
+                                    ", node=" + 
exchFut.discoveryEvent().node().id() + ']');
                         }
+                        else
+                            U.log(log, "Skipping rebalancing (nothing 
scheduled) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
+                                ", node=" + 
exchFut.discoveryEvent().node().id() + ']');
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {
@@ -1425,13 +1422,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 }
             }
         }
-
-        /**
-         * @return {@code True} if another exchange future has been queued up.
-         */
-        boolean topologyChanged() {
-            return !futQ.isEmpty() || busy;
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/67f88584/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 29ca5f4..40d3dc1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -114,6 +114,10 @@ public class GridDhtPartitionDemander {
     @Deprecated//Backward compatibility. To be removed in future.
     private final AtomicInteger dmIdx = new AtomicInteger();
 
+    /** DemandWorker. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private volatile DemandWorker worker;
+
     /** Cached rebalance topics. */
     private final Map<Integer, Object> rebalanceTopics;
 
@@ -166,6 +170,11 @@ public class GridDhtPartitionDemander {
             rebalanceFut.onDone(false);
         }
 
+        DemandWorker dw = worker;
+
+        if (dw != null)
+            dw.cancel();
+
         lastExchangeFut = null;
 
         lastTimeoutObj.set(null);
@@ -426,9 +435,9 @@ public class GridDhtPartitionDemander {
                 d.timeout(cctx.config().getRebalanceTimeout());
                 d.workerId(0);//old api support.
 
-                DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), 
fut);
+                worker = new DemandWorker(dmIdx.incrementAndGet(), fut);
 
-                dw.run(node, d);
+                worker.run(node, d);
             }
         }
 
@@ -1137,6 +1146,13 @@ public class GridDhtPartitionDemander {
             return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
         }
 
+        /** */
+        public void cancel() {
+            msgQ.clear();
+
+            msgQ.offer(new SupplyMessage(null, null));
+        }
+
         /**
          * @param node Node to demand from.
          * @param topVer Topology version.
@@ -1159,7 +1175,7 @@ public class GridDhtPartitionDemander {
             d.topic(topic(cntr));
             d.workerId(id);
 
-            if (topologyChanged(fut))
+            if (fut.isDone() || topologyChanged(fut))
                 return;
 
             cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, 
GridDhtPartitionSupplyMessage>() {
@@ -1228,6 +1244,9 @@ public class GridDhtPartitionDemander {
                                 continue; // While.
                         }
 
+                        if (s.senderId() == null)
+                            return; // Stopping now.
+
                         // Check that message was received from expected node.
                         if (!s.senderId().equals(node.id())) {
                             U.warn(log, "Received supply message from 
unexpected node [expectedId=" + node.id() +

Reply via email to