Repository: ignite Updated Branches: refs/heads/ignite-4424 e5f04ccbc -> 6bcd9c631
IGNITE-4424 REPLICATED cache isn't synced across nodes Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6bcd9c63 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6bcd9c63 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6bcd9c63 Branch: refs/heads/ignite-4424 Commit: 6bcd9c63121dd63279788d123f400bf24be12a28 Parents: e5f04cc Author: Anton Vinogradov <a...@apache.org> Authored: Fri Dec 23 16:20:32 2016 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Fri Dec 23 16:20:32 2016 +0300 ---------------------------------------------------------------------- .../GridNearAtomicAbstractUpdateFuture.java | 26 ++++++++++---------- .../GridNearAtomicSingleUpdateFuture.java | 15 +++++++---- .../dht/atomic/GridNearAtomicUpdateFuture.java | 21 ++++++++++------ 3 files changed, 37 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6bcd9c63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index fd0a699..19b386b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -212,16 +212,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt // Cannot remap. remapCnt = 1; - beforeMap(topVer); + GridCacheVersion futVer = addAtomicFuture(topVer); - map(topVer); + if (futVer != null) + map(topVer, futVer); } } /** * @param topVer Topology version. + * @param futVer Future version */ - protected abstract void map(AffinityTopologyVersion topVer); + protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer); /** * Maps future on ready topology. @@ -321,21 +323,19 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * Adds future prevents topology change before operation complete. * Should be invoked before topology lock released. * @param topVer Topology version. + * @return Future version in case Future added. */ - protected void beforeMap(AffinityTopologyVersion topVer) { + protected GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) { GridCacheVersion futVer = cctx.versions().next(topVer); - synchronized (mux) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; - - this.topVer = topVer; - this.futVer = futVer; - } - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, this)) + if (!cctx.mvcc().addAtomicFuture(futVer, this)) { assert isDone() : this; + + return null; + } } + + return futVer; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6bcd9c63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 3ce1d75..1b32ec9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -383,6 +383,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion topVer; + GridCacheVersion futVer; + try { if (cache.topology().stopping()) { onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + @@ -404,7 +406,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda topVer = fut.topologyVersion(); - beforeMap(topVer); + futVer = addAtomicFuture(topVer); } else { if (waitTopFut) { @@ -430,11 +432,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda cache.topology().readUnlock(); } - map(topVer); + if (futVer != null) + map(topVer, futVer); } /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer) { + @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -467,10 +470,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda singleReq0 = mapSingleUpdate(topVer, futVer, updVer); synchronized (mux) { - assert this.futVer != null : this; - assert this.topVer == topVer : this; + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + this.topVer = topVer; this.updVer = updVer; + this.futVer = futVer; resCnt = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/6bcd9c63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 0acf351..10b0fbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -492,6 +492,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu AffinityTopologyVersion topVer; + GridCacheVersion futVer; + try { if (cache.topology().stopping()) { onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + @@ -513,7 +515,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu topVer = fut.topologyVersion(); - beforeMap(topVer); + futVer = addAtomicFuture(topVer); } else { if (waitTopFut) { @@ -539,7 +541,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu cache.topology().readUnlock(); } - map(topVer, remapKeys); + if (futVer != null) + map(topVer, futVer, remapKeys); } /** @@ -597,15 +600,17 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer) { - map(topVer, null); + @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) { + map(topVer, futVer, null); } /** * @param topVer Topology version. * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { + void map(AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -669,10 +674,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } synchronized (mux) { - assert this.futVer != null : this; - assert this.topVer == topVer : this; + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + this.topVer = topVer; this.updVer = updVer; + this.futVer = futVer; resCnt = 0;