IGNITE-9012 Fixed exchange await logic in GridServiceProcessor - Fixes #4367.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66e547a9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66e547a9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66e547a9

Branch: refs/heads/ignite-8783
Commit: 66e547a9eebf3e8354135b3300619754294a805d
Parents: 174e9cb
Author: EdShangGG <eshangar...@gridgain.com>
Authored: Tue Jul 17 17:19:36 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Tue Jul 17 17:19:36 2018 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTopologyFuture.java  |  2 +-
 .../service/GridServiceProcessor.java           | 35 ++++++++++----------
 2 files changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/66e547a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index bc0331c..489fb63 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -47,7 +47,7 @@ public interface GridDhtTopologyFuture extends 
IgniteInternalFuture<AffinityTopo
 
     /**
      * Gets result topology version of this future. Result version can differ 
from initial exchange version
-     * if excanges for multiple discovery events are merged, in this case 
result version is version of last
+     * if exchanges for multiple discovery events are merged, in this case 
result version is version of last
      * discovery event.
      * <p>
      * This method should be called only for finished topology future

http://git-wip-us.apache.org/repos/asf/ignite/blob/66e547a9/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index f8c4b73..04c50ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1746,39 +1746,40 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
          * @param initTopVer listening-in topology version.
          * @return {@code True} if current event is not last and should be 
skipped.
          */
-        private boolean skipExchange(AffinityTopologyVersion initTopVer) {
+        private boolean skipExchange(final AffinityTopologyVersion initTopVer) 
{
             AffinityTopologyVersion pendingTopVer = null;
-            AffinityTopologyVersion newTopVer = currTopVer;
+            AffinityTopologyVersion newTopVer;
 
-            if (!initTopVer.equals(newTopVer))
+            if (!initTopVer.equals(newTopVer = currTopVer))
                 pendingTopVer = newTopVer;
             else {
-                GridDhtTopologyFuture fut = 
ctx.cache().context().exchange().lastTopologyFuture();
+                IgniteInternalFuture<?> affReadyFut = 
ctx.cache().context().exchange().affinityReadyFuture(initTopVer);
 
-                if (!fut.isDone() && !fut.isCancelled()) {
+                if (affReadyFut != null) {
                     try {
-                        fut.get();
+                        affReadyFut.get();
                     }
                     catch (IgniteCheckedException e) {
-                        throw U.convertException(e);
+                        U.error(log, "Failed to wait for affinity ready future 
" +
+                            "(the assignment will be recalculated anyway)", e);
                     }
                 }
 
-                AffinityTopologyVersion lastTopVer;
-
                 // If exchange already moved forward - skip current version.
-                if (fut.exchangeDone() && newTopVer.compareTo(lastTopVer = 
fut.topologyVersion()) < 0)
-                    pendingTopVer = lastTopVer;
+                if (!initTopVer.equals(newTopVer = currTopVer))
+                    pendingTopVer = newTopVer;
             }
 
-            if (pendingTopVer != null && log.isInfoEnabled()) {
+            boolean skipExchange = pendingTopVer != null;
+
+            if (skipExchange && log.isInfoEnabled()) {
                 log.info("Service processor detected a topology change during 
" +
                     "assignments calculation (will abort current iteration and 
" +
                     "re-calculate on the newer version): " +
                     "[topVer=" + initTopVer + ", newTopVer=" + pendingTopVer + 
']');
             }
 
-            return pendingTopVer != null;
+            return skipExchange;
         }
 
         /** {@inheritDoc} */
@@ -1869,11 +1870,11 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
                         // Clean up zombie assignments.
                         IgniteInternalCache<Object, Object> cache = 
serviceCache();
 
-                        // If topology changed again, let next event handle it.
-                        if (skipExchange(topVer))
-                            return;
-
                         while (it.hasNext()) {
+                            // If topology changed again, let next event 
handle it.
+                            if (skipExchange(topVer))
+                                return;
+
                             Cache.Entry<Object, Object> e = it.next();
 
                             if 
(cache.context().affinity().primaryByKey(ctx.grid().localNode(), e.getKey(), 
topVer)) {

Reply via email to