IGNITE-9012 Fixed exchange await logic in GridServiceProcessor - Fixes #4367.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66e547a9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66e547a9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66e547a9 Branch: refs/heads/ignite-8783 Commit: 66e547a9eebf3e8354135b3300619754294a805d Parents: 174e9cb Author: EdShangGG <eshangar...@gridgain.com> Authored: Tue Jul 17 17:19:36 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Jul 17 17:19:36 2018 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTopologyFuture.java | 2 +- .../service/GridServiceProcessor.java | 35 ++++++++++---------- 2 files changed, 19 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/66e547a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index bc0331c..489fb63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -47,7 +47,7 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo /** * Gets result topology version of this future. Result version can differ from initial exchange version - * if excanges for multiple discovery events are merged, in this case result version is version of last + * if exchanges for multiple discovery events are merged, in this case result version is version of last * discovery event. * <p> * This method should be called only for finished topology future http://git-wip-us.apache.org/repos/asf/ignite/blob/66e547a9/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index f8c4b73..04c50ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1746,39 +1746,40 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite * @param initTopVer listening-in topology version. * @return {@code True} if current event is not last and should be skipped. */ - private boolean skipExchange(AffinityTopologyVersion initTopVer) { + private boolean skipExchange(final AffinityTopologyVersion initTopVer) { AffinityTopologyVersion pendingTopVer = null; - AffinityTopologyVersion newTopVer = currTopVer; + AffinityTopologyVersion newTopVer; - if (!initTopVer.equals(newTopVer)) + if (!initTopVer.equals(newTopVer = currTopVer)) pendingTopVer = newTopVer; else { - GridDhtTopologyFuture fut = ctx.cache().context().exchange().lastTopologyFuture(); + IgniteInternalFuture<?> affReadyFut = ctx.cache().context().exchange().affinityReadyFuture(initTopVer); - if (!fut.isDone() && !fut.isCancelled()) { + if (affReadyFut != null) { try { - fut.get(); + affReadyFut.get(); } catch (IgniteCheckedException e) { - throw U.convertException(e); + U.error(log, "Failed to wait for affinity ready future " + + "(the assignment will be recalculated anyway)", e); } } - AffinityTopologyVersion lastTopVer; - // If exchange already moved forward - skip current version. - if (fut.exchangeDone() && newTopVer.compareTo(lastTopVer = fut.topologyVersion()) < 0) - pendingTopVer = lastTopVer; + if (!initTopVer.equals(newTopVer = currTopVer)) + pendingTopVer = newTopVer; } - if (pendingTopVer != null && log.isInfoEnabled()) { + boolean skipExchange = pendingTopVer != null; + + if (skipExchange && log.isInfoEnabled()) { log.info("Service processor detected a topology change during " + "assignments calculation (will abort current iteration and " + "re-calculate on the newer version): " + "[topVer=" + initTopVer + ", newTopVer=" + pendingTopVer + ']'); } - return pendingTopVer != null; + return skipExchange; } /** {@inheritDoc} */ @@ -1869,11 +1870,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite // Clean up zombie assignments. IgniteInternalCache<Object, Object> cache = serviceCache(); - // If topology changed again, let next event handle it. - if (skipExchange(topVer)) - return; - while (it.hasNext()) { + // If topology changed again, let next event handle it. + if (skipExchange(topVer)) + return; + Cache.Entry<Object, Object> e = it.next(); if (cache.context().affinity().primaryByKey(ctx.grid().localNode(), e.getKey(), topVer)) {