ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d20b76c4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d20b76c4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d20b76c4 Branch: refs/heads/ignite-5075 Commit: d20b76c43d242bb9270e606688bc3adba5e61075 Parents: 194446d Author: sboikov <sboi...@gridgain.com> Authored: Wed May 17 21:55:17 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed May 17 21:55:17 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 37 +++++++++++--------- 1 file changed, 21 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d20b76c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index efb02c6..2802217 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -567,10 +567,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler GridCacheContext<K, V> cctx = cacheContext(ctx); if (!cctx.isLocal()) { - cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get(); + AffinityTopologyVersion topVer = initTopVer; + + cacheContext(ctx).affinity().affinityReadyFuture(topVer).get(); for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++) - getOrCreatePartitionRecovery(ctx, partId); + getOrCreatePartitionRecovery(ctx, partId, topVer); } } @@ -736,7 +738,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(); } - PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); + PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion()); return rec.collectEntries(e, cctx, cache); } @@ -869,37 +871,40 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** * @param ctx Context. * @param partId Partition id. + * @param topVer Topology version for current operation. * @return Partition recovery. */ - @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) { + @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, + int partId, + AffinityTopologyVersion topVer) { + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + PartitionRecovery rec = rcvs.get(partId); if (rec == null) { T2<Long, Long> partCntrs = null; - AffinityTopologyVersion initTopVer0 = initTopVer; + Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode; - if (initTopVer0 != null) { + if (initUpdCntrsPerNode != null) { GridCacheContext<K, V> cctx = cacheContext(ctx); GridCacheAffinityManager aff = cctx.affinity(); - if (initUpdCntrsPerNode != null) { - for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) { - Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id()); + for (ClusterNode node : aff.nodesByPartition(partId, topVer)) { + Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id()); - if (map != null) { - partCntrs = map.get(partId); + if (map != null) { + partCntrs = map.get(partId); - break; - } + break; } } - else if (initUpdCntrs != null) - partCntrs = initUpdCntrs.get(partId); } + else if (initUpdCntrs != null) + partCntrs = initUpdCntrs.get(partId); - rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0, + rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer, partCntrs != null ? partCntrs.get2() : null); PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);