Repository: ignite Updated Branches: refs/heads/ignite-3477 3f4a2ee58 -> 949e4c0ac
Must use finished exchange future to call validateCache. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/949e4c0a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/949e4c0a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/949e4c0a Branch: refs/heads/ignite-3477 Commit: 949e4c0ac6a9ebea877bbc336e95f5a8a5cc41f9 Parents: 3f4a2ee Author: sboikov <sboi...@gridgain.com> Authored: Wed Jan 11 15:54:30 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jan 11 15:54:30 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 7 +++-- .../GridCachePartitionExchangeManager.java | 33 +++++++++++++++++--- .../dht/GridPartitionedGetFuture.java | 4 ++- .../dht/GridPartitionedSingleGetFuture.java | 4 ++- .../GridDhtPartitionsExchangeFuture.java | 30 ++++++++++-------- 5 files changed, 56 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 93270ea..9356f31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -86,7 +86,7 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; @@ -1967,8 +1967,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { int keysSize = keys.size(); - Throwable ex = ctx.shared().exchange().lastTopologyFuture() - .validateCache(ctx, recovery, /*read*/true, null, keys); + GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture(); + + Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null; if (ex != null) return new GridFinishedFuture<>(ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1e7689f..46a20ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -145,6 +145,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private volatile GridDhtPartitionsExchangeFuture lastInitializedFut; /** */ + private final AtomicReference<GridDhtTopologyFuture> lastFinishedFut = new AtomicReference<>(); + + /** */ private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); /** */ @@ -159,9 +162,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private GridFutureAdapter<?> reconnectExchangeFut; /** */ - private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>(); - - /** */ private final Object interruptLock = new Object(); /** @@ -610,13 +610,38 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @return Last completed topology future. + * @return Last initialized topology future. */ public GridDhtTopologyFuture lastTopologyFuture() { return lastInitializedFut; } /** + * @return Last finished topology future. + */ + @Nullable public GridDhtTopologyFuture lastFinishedFuture() { + return lastFinishedFut.get(); + } + + /** + * @param fut Finished future. + */ + public void lastFinishedFuture(GridDhtTopologyFuture fut) { + assert fut != null && fut.isDone() : fut; + + while (true) { + GridDhtTopologyFuture cur = lastFinishedFut.get(); + + if (cur == null || fut.topologyVersion().compareTo(cur.topologyVersion()) > 0) { + if (lastFinishedFut.compareAndSet(cur, fut)) + break; + } + else + break; + } + } + + /** * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index cf329ef..6c4da68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -246,7 +246,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda return; } - Throwable err = cctx.topology().topologyVersionFuture().validateCache(cctx, recovery, true, null, keys); + GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture(); + + Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, null, keys) : null; if (err != null) { onDone(err); http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 1744cbd..ea69743 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -198,7 +198,9 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion(); - Throwable err = cctx.topology().topologyVersionFuture().validateCache(cctx, recovery, true, key, null); + GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture(); + + Throwable err = topFut != null ? topFut.validateCache(cctx, recovery, true, key, null) : null; if (err != null) { onDone(err); http://git-wip-us.apache.org/repos/asf/ignite/blob/949e4c0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b7f6680..75fd3e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -205,7 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private boolean centralizedAff; /** Change global state exception. */ - private Exception changeGlobalStateException; + private Exception changeGlobalStateE; /** Change global state exceptions. */ private final Map<UUID, Exception> changeGlobalStateExceptions = new ConcurrentHashMap8<>(); @@ -488,9 +488,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert !dummy && !forcePreload : this; try { - AffinityTopologyVersion topVersion = topologyVersion(); + AffinityTopologyVersion topVer = topologyVersion(); - srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVersion)); + srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVer)); remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId())))); @@ -522,7 +522,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { if (discoEvt.type() == EVT_NODE_JOINED) { - Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVersion); + Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVer); if (!discoEvt.eventNode().isLocal()) cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); @@ -568,7 +568,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT case NONE: { initTopologies(); - onDone(topVersion); + onDone(topVer); break; } @@ -654,10 +654,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridClusterStateProcessor stateProc = cctx.kernalContext().state(); if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) { - changeGlobalStateException = stateProc.onChangeGlobalState(); + changeGlobalStateE = stateProc.onChangeGlobalState(); - if (crd && changeGlobalStateException != null) - changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateException); + if (crd && changeGlobalStateE != null) + changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE); } boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs); @@ -1044,8 +1044,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage( node, exchangeId(), clientOnlyExchange, true); - if (exchangeOnChangeGlobalState && changeGlobalStateException != null) - m.setException(changeGlobalStateException); + if (exchangeOnChangeGlobalState && changeGlobalStateE != null) + m.setException(changeGlobalStateE); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -1199,6 +1199,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (discoEvt instanceof DiscoveryCustomEvent) ((DiscoveryCustomEvent)discoEvt).customMessage(null); + cctx.exchange().lastFinishedFuture(this); + return true; } @@ -1213,6 +1215,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT @Nullable Object key, @Nullable Collection<?> keys ) { + assert isDone() : this; + Throwable err = error(); if (err != null) @@ -1318,7 +1322,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT changeGlobalStateExceptions.clear(); crd = null; partReleaseFut = null; - changeGlobalStateException = null; + changeGlobalStateE = null; } /** @@ -1920,8 +1924,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (crd0.isLocal()) { - if (exchangeOnChangeGlobalState && changeGlobalStateException!=null) - changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateException); + if (exchangeOnChangeGlobalState && changeGlobalStateE !=null) + changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE); if (allReceived) { onAllReceived();