ignite-5075

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d20b76c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d20b76c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d20b76c4

Branch: refs/heads/ignite-5075
Commit: d20b76c43d242bb9270e606688bc3adba5e61075
Parents: 194446d
Author: sboikov <sboi...@gridgain.com>
Authored: Wed May 17 21:55:17 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed May 17 21:55:17 2017 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 37 +++++++++++---------
 1 file changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d20b76c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..2802217 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -567,10 +567,12 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         GridCacheContext<K, V> cctx = cacheContext(ctx);
 
         if (!cctx.isLocal()) {
-            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+            AffinityTopologyVersion topVer = initTopVer;
+
+            cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
 
             for (int partId = 0; partId < 
cacheContext(ctx).affinity().partitions(); partId++)
-                getOrCreatePartitionRecovery(ctx, partId);
+                getOrCreatePartitionRecovery(ctx, partId, topVer);
         }
     }
 
@@ -736,7 +738,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 Collections.<CacheEntryEvent<? extends K, ? extends 
V>>emptyList();
         }
 
-        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, 
e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, 
e.partition(), e.topologyVersion());
 
         return rec.collectEntries(e, cctx, cache);
     }
@@ -869,37 +871,40 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     /**
      * @param ctx Context.
      * @param partId Partition id.
+     * @param topVer Topology version for current operation.
      * @return Partition recovery.
      */
-    @NotNull private PartitionRecovery 
getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+    @NotNull private PartitionRecovery 
getOrCreatePartitionRecovery(GridKernalContext ctx,
+        int partId,
+        AffinityTopologyVersion topVer) {
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
         PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
             T2<Long, Long> partCntrs = null;
 
-            AffinityTopologyVersion initTopVer0 = initTopVer;
+            Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = 
this.initUpdCntrsPerNode;
 
-            if (initTopVer0 != null) {
+            if (initUpdCntrsPerNode != null) {
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
                 GridCacheAffinityManager aff = cctx.affinity();
 
-                if (initUpdCntrsPerNode != null) {
-                    for (ClusterNode node : aff.nodesByPartition(partId, 
initTopVer)) {
-                        Map<Integer, T2<Long, Long>> map = 
initUpdCntrsPerNode.get(node.id());
+                for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
+                    Map<Integer, T2<Long, Long>> map = 
initUpdCntrsPerNode.get(node.id());
 
-                        if (map != null) {
-                            partCntrs = map.get(partId);
+                    if (map != null) {
+                        partCntrs = map.get(partId);
 
-                            break;
-                        }
+                        break;
                     }
                 }
-                else if (initUpdCntrs != null)
-                    partCntrs = initUpdCntrs.get(partId);
             }
+            else if (initUpdCntrs != null)
+                partCntrs = initUpdCntrs.get(partId);
 
-            rec = new 
PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
+            rec = new 
PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
                 partCntrs != null ? partCntrs.get2() : null);
 
             PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);

Reply via email to