Repository: ignite
Updated Branches:
  refs/heads/ignite-4424 e5f04ccbc -> 6bcd9c631


IGNITE-4424 REPLICATED cache isn't synced across nodes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6bcd9c63
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6bcd9c63
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6bcd9c63

Branch: refs/heads/ignite-4424
Commit: 6bcd9c63121dd63279788d123f400bf24be12a28
Parents: e5f04cc
Author: Anton Vinogradov <a...@apache.org>
Authored: Fri Dec 23 16:20:32 2016 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Fri Dec 23 16:20:32 2016 +0300

----------------------------------------------------------------------
 .../GridNearAtomicAbstractUpdateFuture.java     | 26 ++++++++++----------
 .../GridNearAtomicSingleUpdateFuture.java       | 15 +++++++----
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 21 ++++++++++------
 3 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6bcd9c63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index fd0a699..19b386b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -212,16 +212,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridFutureAdapt
             // Cannot remap.
             remapCnt = 1;
 
-            beforeMap(topVer);
+            GridCacheVersion futVer = addAtomicFuture(topVer);
 
-            map(topVer);
+            if (futVer != null)
+                map(topVer, futVer);
         }
     }
 
     /**
      * @param topVer Topology version.
+     * @param futVer Future version
      */
-    protected abstract void map(AffinityTopologyVersion topVer);
+    protected abstract void map(AffinityTopologyVersion topVer, 
GridCacheVersion futVer);
 
     /**
      * Maps future on ready topology.
@@ -321,21 +323,19 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridFutureAdapt
      * Adds future prevents topology change before operation complete.
      * Should be invoked before topology lock released.
      * @param topVer Topology version.
+     * @return Future version in case Future added.
      */
-    protected void beforeMap(AffinityTopologyVersion topVer) {
+    protected GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) 
{
         GridCacheVersion futVer = cctx.versions().next(topVer);
 
-        synchronized (mux) {
-            assert this.futVer == null : this;
-            assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
-            this.topVer = topVer;
-            this.futVer = futVer;
-        }
-
         if (storeFuture()) {
-            if (!cctx.mvcc().addAtomicFuture(futVer, this))
+            if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
                 assert isDone() : this;
+
+                return null;
+            }
         }
+
+        return futVer;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bcd9c63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 3ce1d75..1b32ec9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -383,6 +383,8 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
 
         AffinityTopologyVersion topVer;
 
+        GridCacheVersion futVer;
+
         try {
             if (cache.topology().stopping()) {
                 onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
@@ -404,7 +406,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
 
                 topVer = fut.topologyVersion();
 
-                beforeMap(topVer);
+                futVer = addAtomicFuture(topVer);
             }
             else {
                 if (waitTopFut) {
@@ -430,11 +432,12 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
             cache.topology().readUnlock();
         }
 
-        map(topVer);
+        if (futVer != null)
+            map(topVer, futVer);
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
+    @Override protected void map(AffinityTopologyVersion topVer, 
GridCacheVersion futVer) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -467,10 +470,12 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
             singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
 
             synchronized (mux) {
-                assert this.futVer != null : this;
-                assert this.topVer == topVer : this;
+                assert this.futVer == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
+                this.topVer = topVer;
                 this.updVer = updVer;
+                this.futVer = futVer;
 
                 resCnt = 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bcd9c63/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 0acf351..10b0fbf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -492,6 +492,8 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
 
         AffinityTopologyVersion topVer;
 
+        GridCacheVersion futVer;
+
         try {
             if (cache.topology().stopping()) {
                 onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
@@ -513,7 +515,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
 
                 topVer = fut.topologyVersion();
 
-                beforeMap(topVer);
+                futVer = addAtomicFuture(topVer);
             }
             else {
                 if (waitTopFut) {
@@ -539,7 +541,8 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             cache.topology().readUnlock();
         }
 
-        map(topVer, remapKeys);
+        if (futVer != null)
+            map(topVer, futVer, remapKeys);
     }
 
     /**
@@ -597,15 +600,17 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    @Override protected void map(AffinityTopologyVersion topVer) {
-        map(topVer, null);
+    @Override protected void map(AffinityTopologyVersion topVer, 
GridCacheVersion futVer) {
+        map(topVer, futVer, null);
     }
 
     /**
      * @param topVer Topology version.
      * @param remapKeys Keys to remap.
      */
-    void map(AffinityTopologyVersion topVer, @Nullable 
Collection<KeyCacheObject> remapKeys) {
+    void map(AffinityTopologyVersion topVer,
+        GridCacheVersion futVer,
+        @Nullable Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -669,10 +674,12 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             }
 
             synchronized (mux) {
-                assert this.futVer != null : this;
-                assert this.topVer == topVer : this;
+                assert this.futVer == null : this;
+                assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
+                this.topVer = topVer;
                 this.updVer = updVer;
+                this.futVer = futVer;
 
                 resCnt = 0;
 

Reply via email to