cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/592f2a71 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/592f2a71 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/592f2a71 Branch: refs/heads/ignite-5075-cc-debug Commit: 592f2a71592274b091f372e1ae228716947ab0f1 Parents: 6c73a36 Author: sboikov <sboi...@gridgain.com> Authored: Fri May 26 14:14:16 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri May 26 14:14:16 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryEventBuffer.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/592f2a71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 31b7ace..7d33614 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -23,12 +23,12 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedDeque8; /** * @@ -48,7 +48,7 @@ public class CacheContinuousQueryEventBuffer { private AtomicReference<Batch> curBatch = new AtomicReference<>(); /** */ - private ConcurrentLinkedDeque<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque<>(); + private ConcurrentLinkedDeque8<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque8<>(); /** */ private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>(); @@ -85,14 +85,20 @@ public class CacheContinuousQueryEventBuffer { if (batch != null) ret = batch.flushCurrentEntries(); - if (!backupQ.isEmpty()) { + int size = backupQ.sizex(); + + if (size > 0) { if (ret == null) ret = new ArrayList<>(); - CacheContinuousQueryEntry e; + for (int i = 0; i < size; i++) { + CacheContinuousQueryEntry e = backupQ.pollFirst(); - while ((e = backupQ.pollFirst()) != null) - ret.add(e); + if (e != null) + ret.add(e); + else + break; + } } if (!pending.isEmpty()) {