Repository: ignite Updated Branches: refs/heads/ignite-426-2-reb 0834da6de -> 027ebfdee
IGNITE-426 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/027ebfde Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/027ebfde Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/027ebfde Branch: refs/heads/ignite-426-2-reb Commit: 027ebfdee0f36eec30c618fe78335985bbc96803 Parents: 0834da6 Author: nikolay_tikhonov <[email protected]> Authored: Tue Nov 3 12:45:40 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Nov 3 12:45:40 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 +++- .../continuous/GridContinuousProcessor.java | 28 ++++++++++++-------- 2 files changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/027ebfde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 61374cb..54a7bff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -336,7 +336,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> updateRes.addFailedKey(key, err); } else { - assert keys.size() == updates.size(); + assert keys.size() >= updates.size(); int i = 0; @@ -353,6 +353,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } ++i; + + if (i == updates.size()) + break; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/027ebfde/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 497c6e9..9a8ced3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -211,23 +211,29 @@ public class GridContinuousProcessor extends GridProcessorAdapter { LocalRoutineInfo routine = locInfos.get(msg.routineId()); if (routine != null) { - Map<Integer, Long> idxs = msg.updateIdxs(); + try { + Map<Integer, Long> idxs = msg.updateIdxs(); - GridCacheAdapter<Object, Object> interCache = - ctx.cache().internalCache(routine.handler().cacheName()); + GridCacheAdapter<Object, Object> interCache = + ctx.cache().internalCache(routine.handler().cacheName()); - if (interCache != null && idxs != null && interCache.context() != null - && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) { - Map<Integer, Long> map = interCache.context().topology().updateCounters(); + if (interCache != null && idxs != null && interCache.context() != null + && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) { + Map<Integer, Long> map = interCache.context().topology().updateCounters(); - for (Map.Entry<Integer, Long> e : map.entrySet()) { - Long cntr0 = idxs.get(e.getKey()); - Long cntr1 = e.getValue(); + for (Map.Entry<Integer, Long> e : map.entrySet()) { + Long cntr0 = idxs.get(e.getKey()); + Long cntr1 = e.getValue(); - if (cntr0 == null || cntr1 > cntr0) - idxs.put(e.getKey(), cntr1); + if (cntr0 == null || cntr1 > cntr0) + idxs.put(e.getKey(), cntr1); + } } } + catch (Exception e) { + if (log.isDebugEnabled()) + log.warning("Failed to load update counters.", e); + } routine.handler().updateIdx(msg.updateIdxs()); }
