Repository: ignite
Updated Branches:
  refs/heads/ignite-426-2 203e88b92 -> 803a41414


IGNITE-462 Merged changes from optimize branch.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/803a4141
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/803a4141
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/803a4141

Branch: refs/heads/ignite-426-2
Commit: 803a41414df5d8426841a1491c8b766c43d3497a
Parents: 203e88b
Author: nikolay_tikhonov <ntikho...@gridgain.com>
Authored: Wed Nov 18 20:34:24 2015 +0300
Committer: nikolay_tikhonov <ntikho...@gridgain.com>
Committed: Wed Nov 18 20:34:24 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  18 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  12 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 204 ++++++++++---------
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  54 ++++-
 .../continuous/CacheContinuousQueryManager.java |  11 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 6 files changed, 180 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/803a4141/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 75c1039..7a3be2f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1190,7 +1190,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             if (cctx.isLocal() || cctx.isReplicated() ||
                 (!isNear() && !(tx != null && tx.onePhaseCommit() && 
!tx.local())))
-                cctx.continuousQueries().onEntryUpdated(this, key, val, old, 
tx.local(), false, updateCntr0, topVer);
+                cctx.continuousQueries().onEntryUpdated(key, val, old, 
isInternal() || !context().userCache(),
+                    partition(), tx.local(), false, updateCntr0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1369,7 +1370,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             if (cctx.isLocal() || cctx.isReplicated() ||
                 (!isNear() && !(tx != null && tx.onePhaseCommit() && 
!tx.local())))
-                cctx.continuousQueries().onEntryUpdated(this, key, null, old, 
tx.local(), false, updateCntr0, topVer);
+                cctx.continuousQueries().onEntryUpdated(key, null, old, 
isInternal()
+                    || !context().userCache(),partition(), tx.local(), false, 
updateCntr0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, true);
 
@@ -1707,8 +1709,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (!isNear()) {
                 long updateCntr = 
nextPartCounter(AffinityTopologyVersion.NONE);
 
-                cctx.continuousQueries().onEntryUpdated(this, key, val, old, 
true, false, updateCntr,
-                    AffinityTopologyVersion.NONE);
+                cctx.continuousQueries().onEntryUpdated(key, val, old, 
isInternal() || !context().userCache(),
+                    partition(), true, false, updateCntr, 
AffinityTopologyVersion.NONE);
             }
 
             cctx.dataStructures().onEntryUpdated(key, op == 
GridCacheOperation.DELETE);
@@ -1967,9 +1969,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             if (updateCntr != null)
                                 updateCntr0 = updateCntr;
 
-                            
cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal,
-                                    prevVal, primary, false, updateCntr0, 
topVer);
-                        }
+                            cctx.continuousQueries().onEntryUpdated(key, 
evtVal, prevVal, isInternal()
+                                || !context().userCache(), partition(), 
primary, false, updateCntr0, topVer);                        }
 
                         return new GridCacheUpdateAtomicResult(false,
                             retval ? rawGetOrUnmarshalUnlocked(false) : null,
@@ -3227,7 +3228,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 drReplicate(drType, val, ver);
 
                 if (!skipQryNtf) {
-                    cctx.continuousQueries().onEntryUpdated(this, key, val, 
null, true, preload, updateCntr, topVer);
+                    cctx.continuousQueries().onEntryUpdated(key, val, null, 
this.isInternal()
+                        || !this.context().userCache(), this.partition(), 
true, preload, updateCntr, topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/803a4141/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1ef1d1e..94c3ee1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1836,8 +1836,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     }
                 }
                 else if (!entry.isNear() && updRes.success()) {
-                    ctx.continuousQueries().onEntryUpdated(entry, entry.key(), 
updRes.newValue(), updRes.oldValue(),
-                        primary, false, updRes.updateCounter(), topVer);
+                    ctx.continuousQueries().onEntryUpdated(entry.key(), 
updRes.newValue(), updRes.oldValue(),
+                        entry.isInternal() || !context().userCache(), 
entry.partition(), primary, false,
+                        updRes.updateCounter(), topVer);
                 }
 
                 if (hasNear) {
@@ -2525,7 +2526,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         CacheObject prevVal = req.previousValue(i);
 
                         EntryProcessor<Object, Object, Object> entryProcessor 
= req.entryProcessor(i);
-                        Long updateIdx = req.updateIdx(i);
+                        Long updateIdx = req.updateCounter(i);
 
                         GridCacheOperation op = entryProcessor != null ? 
TRANSFORM :
                             (val != null) ? UPDATE : DELETE;
@@ -2565,8 +2566,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             ctx.onDeferredDelete(entry, 
updRes.removeVersion());
 
                         if (updRes.success() && !entry.isNear())
-                            ctx.continuousQueries().onEntryUpdated(entry, 
entry.key(), updRes.newValue(),
-                                updRes.oldValue(), false, false, 
updRes.updateCounter(), req.topologyVersion());
+                            
ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(),
+                                updRes.oldValue(), entry.isInternal() || 
!context().userCache(), entry.partition(),
+                                false, false, updRes.updateCounter(), 
req.topologyVersion());
 
                         entry.onUnlock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/803a4141/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 7befd42..eb09ff2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -20,10 +20,8 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
@@ -45,12 +43,10 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T4;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
@@ -69,42 +65,42 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
     protected static IgniteLogger log;
 
     /** Cache context. */
-    private GridCacheContext cctx;
+    private final GridCacheContext cctx;
 
     /** Future version. */
-    private GridCacheVersion futVer;
+    private final GridCacheVersion futVer;
 
     /** Write version. */
-    private GridCacheVersion writeVer;
+    private final GridCacheVersion writeVer;
 
     /** Force transform backup flag. */
     private boolean forceTransformBackups;
 
     /** Completion callback. */
     @GridToStringExclude
-    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> 
completionCb;
+    private final CI2<GridNearAtomicUpdateRequest, 
GridNearAtomicUpdateResponse> completionCb;
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new 
ConcurrentHashMap8<>();
+    private final Map<UUID, GridDhtAtomicUpdateRequest> mappings;
 
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
     /** Update request. */
-    private GridNearAtomicUpdateRequest updateReq;
+    private final GridNearAtomicUpdateRequest updateReq;
 
     /** Update response. */
-    private GridNearAtomicUpdateResponse updateRes;
+    private final GridNearAtomicUpdateResponse updateRes;
 
     /** Future keys. */
-    private Collection<KeyCacheObject> keys;
-
-    /** Updates. */
-    private List<T4<GridDhtCacheEntry, CacheObject, CacheObject, Long>> 
updates;
+    private final Collection<KeyCacheObject> keys;
 
     /** */
-    private boolean waitForExchange;
+    private final boolean waitForExchange;
+
+    /** Response count. */
+    private volatile int resCnt;
 
     /**
      * @param cctx Cache context.
@@ -124,22 +120,21 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
         this.cctx = cctx;
         this.writeVer = writeVer;
 
-        futVer = cctx.versions().next(updateReq.topologyVersion());
-        this.updateReq = updateReq;
-        this.completionCb = completionCb;
-        this.updateRes = updateRes;
-
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, 
GridDhtAtomicUpdateFuture.class);
+    futVer = cctx.versions().next(updateReq.topologyVersion());
+    this.updateReq = updateReq;
+    this.completionCb = completionCb;
+    this.updateRes = updateRes;
 
-        keys = new ArrayList<>(updateReq.keys().size());
+    if (log == null)
+    log = U.logger(cctx.kernalContext(), logRef, 
GridDhtAtomicUpdateFuture.class);
 
-        updates = new ArrayList<>(updateReq.keys().size());
+    keys = new ArrayList<>(updateReq.keys().size());
+    mappings = U.newHashMap(updateReq.keys().size());
 
-        boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() 
&& !updateReq.clientRequest());
+    boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && 
!updateReq.clientRequest());
 
-        waitForExchange = !topLocked;
-    }
+    waitForExchange = !topLocked;
+}
 
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
@@ -152,22 +147,42 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
     }
 
     /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        if (log.isDebugEnabled())
+            log.debug("Processing node leave event [fut=" + this + ", nodeId=" 
+ nodeId + ']');
+
+        return registerResponse(nodeId);
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
         return F.view(F.viewReadOnly(mappings.keySet(), 
U.id2Node(cctx.kernalContext())), F.notNull());
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        if (log.isDebugEnabled())
-            log.debug("Processing node leave event [fut=" + this + ", nodeId=" 
+ nodeId + ']');
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if request found.
+     */
+    private boolean registerResponse(UUID nodeId) {
+        int resCnt0;
 
         GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
 
         if (req != null) {
-            // Remove only after added keys to failed set.
-            mappings.remove(nodeId);
+            synchronized (this) {
+                if (req.onResponse()) {
+                    resCnt0 = resCnt;
 
-            checkComplete();
+                    resCnt0 += 1;
+
+                    resCnt = resCnt0;
+                }
+                else
+                    return false;
+            }
+
+            if (resCnt0 == mappings.size())
+                onDone();
 
             return true;
         }
@@ -205,7 +220,7 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
-     * @param updateIdx Partition update index.
+     * @param updateCntr Partition update counter.
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
         @Nullable CacheObject val,
@@ -215,12 +230,10 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateIdx) {
+        @Nullable Long updateCntr) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        int part = entry.partition();
-
-        Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, 
topVer);
+        Collection<ClusterNode> dhtNodes = 
cctx.dht().topology().nodes(entry.partition(), topVer);
 
         if (log.isDebugEnabled())
             log.debug("Mapping entry to DHT nodes [nodes=" + 
U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -229,8 +242,6 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
 
         keys.add(entry.key());
 
-        updates.add(new T4<>(entry, val, prevVal, updateIdx));
-
         for (ClusterNode node : dhtNodes) {
             UUID nodeId = node.id();
 
@@ -261,8 +272,20 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
                     conflictExpireTime,
                     conflictVer,
                     addPrevVal,
+                    entry.partition(),
                     prevVal,
-                    updateIdx);
+                    updateCntr);
+            }
+            else if (dhtNodes.size() == 1) {
+                try {
+                    cctx.continuousQueries().onEntryUpdated(entry.key(), val, 
prevVal,
+                        entry.key().internal() || !cctx.userCache(), 
entry.partition(), true, false,
+                        updateCntr, updateReq.topologyVersion());
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to send continuous query message. 
[key=" + entry.key() + ", newVal="
+                        + val + ", err=" + e + "]");
+                }
             }
         }
     }
@@ -332,43 +355,55 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
             cctx.mvcc().removeAtomicFuture(version());
 
             if (err != null) {
-                int i = 0;
+                if (!mappings.isEmpty()) {
+                    Collection<KeyCacheObject> hndKeys = new 
ArrayList<>(keys.size());
 
-                for (KeyCacheObject key : keys) {
-                    updateRes.addFailedKey(key, err);
+                    exit: for (GridDhtAtomicUpdateRequest req : 
mappings.values()) {
+                        for (int i = 0; i < req.size(); i++) {
+                            KeyCacheObject key = req.key(i);
 
-                    if (i < updates.size()) {
+                            if (!hndKeys.contains(key)) {
+                                updateRes.addFailedKey(key, err);
 
-                        T4<GridDhtCacheEntry, CacheObject, CacheObject, Long> 
upd = updates.get(i);
+                                cctx.continuousQueries().skipUpdateEvent(key, 
req.partitionId(i), req.updateCounter(i),
+                                    updateReq.topologyVersion());
 
-                        cctx.continuousQueries().skipUpdateEvent(key, 
upd.get1().partition(), upd.get4(),
-                            updateReq.topologyVersion());
+                                hndKeys.add(key);
 
-                        ++i;
+                                if (hndKeys.size() == keys.size())
+                                    break exit;
+                            }
+                        }
                     }
                 }
+                else
+                    for (KeyCacheObject key : keys)
+                        updateRes.addFailedKey(key, err);
             }
             else {
-                assert keys.size() >= updates.size();
-
-                int i = 0;
-
-                for (KeyCacheObject key : keys) {
-                    if (i == updates.size())
-                        break;
-
-                    T4<GridDhtCacheEntry, CacheObject, CacheObject, Long> upd 
= updates.get(i);
-
-                    try {
-                        cctx.continuousQueries().onEntryUpdated(upd.get1(), 
key, upd.get2(), upd.get3(), true, false,
-                            upd.get4(), updateReq.topologyVersion());
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.warn(log, "Failed to send continuous query message. 
[key=" + key + ", newVal="
-                            + upd.get1() + ", err=" + e + "]");
+                Collection<KeyCacheObject> hndKeys = new 
ArrayList<>(keys.size());
+
+                exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) 
{
+                    for (int i = 0; i < req.size(); i++) {
+                        KeyCacheObject key = req.key(i);
+
+                        if (!hndKeys.contains(key)) {
+                            try {
+                                cctx.continuousQueries().onEntryUpdated(key, 
req.value(i), req.localPreviousValue(i),
+                                    key.internal() || !cctx.userCache(), 
req.partitionId(i), true, false,
+                                    req.updateCounter(i), 
updateReq.topologyVersion());
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.warn(log, "Failed to send continuous query 
message. [key=" + key + ", newVal="
+                                    + req.value(i) + ", err=" + e + "]");
+                            }
+
+                            hndKeys.add(key);
+
+                            if (hndKeys.size() == keys.size())
+                                break exit;
+                        }
                     }
-
-                    ++i;
                 }
             }
 
@@ -397,18 +432,18 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
                     U.warn(log, "Failed to send update request to backup node 
because it left grid: " +
                         req.nodeId());
 
-                    mappings.remove(req.nodeId());
+                    registerResponse(req.nodeId());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send update request to backup node 
(did node leave the grid?): "
                         + req.nodeId(), e);
 
-                    mappings.remove(req.nodeId());
+                    registerResponse(req.nodeId());
                 }
             }
         }
-
-        checkComplete();
+        else
+            onDone();
 
         // Send response right away if no ACKs from backup is required.
         // Backups will send ACKs anyway, future will be completed after all 
backups have replied.
@@ -443,9 +478,7 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
             }
         }
 
-        mappings.remove(nodeId);
-
-        checkComplete();
+        registerResponse(nodeId);
     }
 
     /**
@@ -457,22 +490,7 @@ public class GridDhtAtomicUpdateFuture extends 
GridFutureAdapter<Void>
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result 
[nodeId=" + nodeId + ']');
 
-        mappings.remove(nodeId);
-
-        checkComplete();
-    }
-
-    /**
-     * Checks if all required responses are received.
-     */
-    private void checkComplete() {
-        // Always wait for replies from all backups.
-        if (mappings.isEmpty()) {
-            if (log.isDebugEnabled())
-                log.debug("Completing DHT atomic update future: " + this);
-
-            onDone();
-        }
+        registerResponse(nodeId);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/803a4141/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 82a7313..72a60d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -147,6 +147,16 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     /** Partition. */
     private GridLongList updateCntrs;
 
+    /** On response flag. Access should be synced on future. */
+    @GridDirectTransient
+    private boolean onRes;
+
+    @GridDirectTransient
+    private List<Integer> partIds;
+
+    @GridDirectTransient
+    private List<CacheObject> localPrevVals;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -197,6 +207,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         this.addDepInfo = addDepInfo;
 
         keys = new ArrayList<>();
+        partIds = new ArrayList<>();
+        localPrevVals = new ArrayList<>();
 
         if (forceTransformBackups) {
             entryProcessors = new ArrayList<>();
@@ -230,10 +242,15 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         long conflictExpireTime,
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
+        int partId,
         @Nullable CacheObject prevVal,
         @Nullable Long updateIdx) {
         keys.add(key);
 
+        partIds.add(partId);
+
+        localPrevVals.add(prevVal);
+
         if (forceTransformBackups) {
             assert entryProcessor != null;
 
@@ -306,8 +323,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
-        long expireTime)
-    {
+        long expireTime) {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
 
@@ -438,12 +454,20 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     }
 
     /**
-     * @param idx Counter index.
+     * @param idx Partition index.
+     * @return Partition id.
+     */
+    public int partitionId(int idx) {
+        return partIds.get(idx);
+    }
+
+    /**
+     * @param updCntr Update counter.
      * @return Update counter.
      */
-    public Long updateIdx(int idx) {
-        if (updateCntrs != null && idx < updateCntrs.size())
-            return updateCntrs.get(idx);
+    public Long updateCounter(int updCntr) {
+        if (updateCntrs != null && updCntr < updateCntrs.size())
+            return updateCntrs.get(updCntr);
 
         return null;
     }
@@ -480,6 +504,14 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
 
     /**
      * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public CacheObject localPreviousValue(int idx) {
+        return localPrevVals.get(idx);
+    }
+
+    /**
+     * @param idx Key index.
      * @return Entry processor.
      */
     @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int 
idx) {
@@ -576,14 +608,20 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     }
 
     /**
+     * @return {@code True} if on response flag changed.
+     */
+    public boolean onResponse() {
+        return !onRes && (onRes = true);
+    }
+
+    /**
      * @return Optional arguments for entry processor.
      */
     @Nullable public Object[] invokeArguments() {
         return invokeArgs;
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
         super.prepareMarshal(ctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/803a4141/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 925561b..8bbb916 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -183,31 +183,30 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     }
 
     /**
-     * @param e Cache entry.
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
+     * @param internal Internal entry (internal key or not user cache),
      * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
      * @param topVer Topology version.
      * @throws IgniteCheckedException In case of error.
      */
-    public void onEntryUpdated(GridCacheEntryEx e,
+    public void onEntryUpdated(
         KeyCacheObject key,
         CacheObject newVal,
         CacheObject oldVal,
+        boolean internal,
+        int partId,
         boolean primary,
         boolean preload,
         long updateCntr,
         AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
-        assert e != null;
         assert key != null;
 
-        boolean internal = e.isInternal() || !e.context().userCache();
-
         if (preload && !internal)
             return;
 
@@ -257,7 +256,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                 key,
                 newVal,
                 lsnr.oldValueRequired() ? oldVal : null,
-                e.partition(),
+                partId,
                 updateCntr,
                 topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/803a4141/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index c4aaa2a..b311272 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1091,7 +1091,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             }
         }
 
-        if (!lostAllow && !lostEvents.isEmpty()) {
+        if (!lostAllow && lostEvents.size() > 100) {
             log.error("Lost event cnt: " + lostEvents.size());
 
             for (T3<Object, Object, Object> e : lostEvents)

Reply via email to