cc

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/592f2a71
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/592f2a71
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/592f2a71

Branch: refs/heads/ignite-5075-cc-debug
Commit: 592f2a71592274b091f372e1ae228716947ab0f1
Parents: 6c73a36
Author: sboikov <sboi...@gridgain.com>
Authored: Fri May 26 14:14:16 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri May 26 14:14:16 2017 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryEventBuffer.java          | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/592f2a71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 31b7ace..7d33614 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -23,12 +23,12 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 /**
  *
@@ -48,7 +48,7 @@ public class CacheContinuousQueryEventBuffer {
     private AtomicReference<Batch> curBatch = new AtomicReference<>();
 
     /** */
-    private ConcurrentLinkedDeque<CacheContinuousQueryEntry> backupQ = new 
ConcurrentLinkedDeque<>();
+    private ConcurrentLinkedDeque8<CacheContinuousQueryEntry> backupQ = new 
ConcurrentLinkedDeque8<>();
 
     /** */
     private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = 
new ConcurrentSkipListMap<>();
@@ -85,14 +85,20 @@ public class CacheContinuousQueryEventBuffer {
         if (batch != null)
             ret = batch.flushCurrentEntries();
 
-        if (!backupQ.isEmpty()) {
+        int size = backupQ.sizex();
+
+        if (size > 0) {
             if (ret == null)
                 ret = new ArrayList<>();
 
-            CacheContinuousQueryEntry e;
+            for (int i = 0; i < size; i++) {
+                CacheContinuousQueryEntry e = backupQ.pollFirst();
 
-            while ((e = backupQ.pollFirst()) != null)
-                ret.add(e);
+                if (e != null)
+                    ret.add(e);
+                else
+                    break;
+            }
         }
 
         if (!pending.isEmpty()) {

Reply via email to