Tried to simplify GridDhtAtomicCache.updateAllAsyncInternal0.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5704e393
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5704e393
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5704e393

Branch: refs/heads/ignite-5578
Commit: 5704e393102cc8c24df7bfb4ff9053003530b7fc
Parents: 9e79c4b
Author: sboikov <[email protected]>
Authored: Thu Jul 27 12:50:37 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jul 27 12:50:37 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/DhtAtomicUpdateResult.java       | 131 +++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 380 ++++++++-----------
 .../GridNearAtomicAbstractUpdateFuture.java     |   2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   4 +-
 4 files changed, 282 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
new file mode 100644
index 0000000..e7d2b19
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class DhtAtomicUpdateResult {
+    /** */
+    private GridCacheReturn retVal;
+
+    /** */
+    private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> 
deleted;
+
+    /** */
+    private GridDhtAtomicAbstractUpdateFuture dhtFut;
+
+    /** */
+    private IgniteCacheExpiryPolicy expiry;
+
+    /**
+     *
+     */
+    DhtAtomicUpdateResult() {
+        // No-op.
+    }
+
+    /**
+     * @param retVal Return value.
+     * @param deleted Deleted entries.
+     * @param dhtFut DHT update future.
+     */
+    DhtAtomicUpdateResult(GridCacheReturn retVal,
+        Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted,
+        GridDhtAtomicAbstractUpdateFuture dhtFut) {
+        this.retVal = retVal;
+        this.deleted = deleted;
+        this.dhtFut = dhtFut;
+    }
+
+    /**
+     * @param expiry Expiry policy.
+     */
+    void expiryPolicy(@Nullable IgniteCacheExpiryPolicy expiry) {
+        this.expiry = expiry;
+    }
+
+    /**
+     * @return Expiry policy.
+     */
+    @Nullable IgniteCacheExpiryPolicy expiryPolicy() {
+        return expiry;
+    }
+
+    /**
+     * @param entry Entry.
+     * @param updRes Entry update result.
+     * @param entries All entries.
+     */
+    void addDeleted(GridDhtCacheEntry entry,
+        GridCacheUpdateAtomicResult updRes,
+        Collection<GridDhtCacheEntry> entries) {
+        if (updRes.removeVersion() != null) {
+            if (deleted == null)
+                deleted = new ArrayList<>(entries.size());
+
+            deleted.add(F.t(entry, updRes.removeVersion()));
+        }
+    }
+
+    /**
+     * @return Deleted entries.
+     */
+    Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() {
+        return deleted;
+    }
+
+    /**
+     * @return DHT future.
+     */
+    GridDhtAtomicAbstractUpdateFuture dhtFuture() {
+        return dhtFut;
+    }
+
+    /**
+     * @param retVal Result for operation.
+     */
+    void returnValue(GridCacheReturn retVal) {
+        this.retVal = retVal;
+    }
+
+    /**
+     * @return Result for invoke operation.
+     */
+    GridCacheReturn returnValue() {
+        return retVal;
+    }
+
+    /**
+     * @param dhtFut DHT future.
+     */
+    void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
+        this.dhtFut = dhtFut;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 712babd..be4aace 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1658,12 +1658,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     /**
      * Executes local update.
      *
-     * @param nodeId Node ID.
+     * @param node Node.
      * @param req Update request.
      * @param completionCb Completion callback.
      */
     void updateAllAsyncInternal(
-        final UUID nodeId,
+        final ClusterNode node,
         final GridNearAtomicAbstractUpdateRequest req,
         final UpdateReplyClosure completionCb
     ) {
@@ -1678,12 +1678,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 return;
             }
             catch (IgniteCheckedException e) {
-                onForceKeysError(nodeId, req, completionCb, e);
+                onForceKeysError(node.id(), req, completionCb, e);
 
                 return;
             }
 
-            updateAllAsyncInternal0(nodeId, req, completionCb);
+            updateAllAsyncInternal0(node, req, completionCb);
         }
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@@ -1695,12 +1695,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         return;
                     }
                     catch (IgniteCheckedException e) {
-                        onForceKeysError(nodeId, req, completionCb, e);
+                        onForceKeysError(node.id(), req, completionCb, e);
 
                         return;
                     }
 
-                    updateAllAsyncInternal0(nodeId, req, completionCb);
+                    updateAllAsyncInternal0(node, req, completionCb);
                 }
             });
         }
@@ -1732,26 +1732,17 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     /**
      * Executes local update after preloader fetched values.
      *
-     * @param nodeId Node ID.
+     * @param node Node.
      * @param req Update request.
      * @param completionCb Completion callback.
      */
     private void updateAllAsyncInternal0(
-        UUID nodeId,
+        ClusterNode node,
         GridNearAtomicAbstractUpdateRequest req,
         UpdateReplyClosure completionCb
     ) {
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null) {
-            U.warn(msgLog, "Skip near update request, node originated update 
request left [" +
-                "futId=" + req.futureId() + ", node=" + nodeId + ']');
-
-            return;
-        }
-
         GridNearAtomicUpdateResponse res = new 
GridNearAtomicUpdateResponse(ctx.cacheId(),
-            nodeId,
+            node.id(),
             req.futureId(),
             req.partition(),
             false,
@@ -1763,8 +1754,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         boolean remap = false;
 
-        String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
-
         IgniteCacheExpiryPolicy expiry = null;
 
         ctx.shared().database().checkpointReadLock();
@@ -1795,97 +1784,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     // Do not check topology version if topology was locked on 
near node by
                     // external transaction or explicit lock.
                     if (req.topologyLocked() || 
!needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        boolean hasNear = req.nearCache();
-
-                        // Assign next version for update inside entries lock.
-                        GridCacheVersion ver = 
ctx.versions().next(top.topologyVersion());
-
-                        if (hasNear)
-                            res.nearVersion(ver);
-
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("Assigned update version [futId=" + 
req.futureId() +
-                                ", writeVer=" + ver + ']');
-                        }
-
-                        assert ver != null : "Got null version for update 
request: " + req;
-
-                        boolean sndPrevVal = 
!top.rebalanceFinished(req.topologyVersion());
-
-                        dhtFut = createDhtFuture(ver, req);
-
-                        expiry = expiryPolicy(req.expiry());
-
-                        GridCacheReturn retVal = null;
-
-                        if (req.size() > 1 &&                    // Several 
keys ...
-                            writeThrough() && !req.skipStore() && // and store 
is enabled ...
-                            !ctx.store().isLocal() &&             // and this 
is not local store ...
-                            // (conflict resolver should be used for local 
store)
-                            !ctx.dr().receiveEnabled()            // and no DR.
-                            ) {
-                            // This method can only be used when there are no 
replicated entries in the batch.
-                            UpdateBatchResult updRes = updateWithBatch(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
-
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
-
-                            if (req.operation() == TRANSFORM)
-                                retVal = updRes.invokeResults();
-                        }
-                        else {
-                            UpdateSingleResult updRes = updateSingle(node,
-                                hasNear,
-                                req,
-                                res,
-                                locked,
-                                ver,
-                                dhtFut,
-                                ctx.isDrEnabled(),
-                                taskName,
-                                expiry,
-                                sndPrevVal);
-
-                            retVal = updRes.returnValue();
-                            deleted = updRes.deleted();
-                            dhtFut = updRes.dhtFuture();
-                        }
+                        DhtAtomicUpdateResult updRes = update(node, locked, 
req, res);
 
-                        if (retVal == null)
-                            retVal = new GridCacheReturn(ctx, node.isLocal(), 
true, null, true);
-
-                        res.returnValue(retVal);
-
-                        if (dhtFut != null) {
-                            if (req.writeSynchronizationMode() == PRIMARY_SYNC
-                                // To avoid deadlock disable back-pressure for 
sender data node.
-                                && !ctx.discovery().cacheAffinityNode(node, 
ctx.name())
-                                && !dhtFut.isDone()) {
-                                final IgniteRunnable tracker = 
GridNioBackPressureControl.threadTracker();
-
-                                if (tracker != null && tracker instanceof 
GridNioMessageTracker) {
-                                    
((GridNioMessageTracker)tracker).onMessageReceived();
-
-                                    dhtFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Void>>() {
-                                        @Override public void 
apply(IgniteInternalFuture<Void> fut) {
-                                            
((GridNioMessageTracker)tracker).onMessageProcessed();
-                                        }
-                                    });
-                                }
-                            }
-
-                            ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
-                        }
+                        dhtFut = updRes.dhtFuture();
+                        deleted = updRes.deleted();
+                        expiry = updRes.expiryPolicy();
                     }
                     else {
                         // Should remap all keys.
@@ -1953,9 +1856,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
             completionCb.apply(req, res);
         }
-        else
+        else {
             if (dhtFut != null)
                 dhtFut.map(node, res.returnValue(), res, completionCb);
+        }
 
         if (req.writeSynchronizationMode() != FULL_ASYNC)
             req.cleanup(!node.isLocal());
@@ -1964,6 +1868,122 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * @param node Node.
+     * @param locked Entries.
+     * @param req Request.
+     * @param res Response.
+     * @return Operation result.
+     * @throws GridCacheEntryRemovedException If got obsolete entry.
+     */
+    private DhtAtomicUpdateResult update(
+        ClusterNode node,
+        List<GridDhtCacheEntry> locked,
+        GridNearAtomicAbstractUpdateRequest req,
+        GridNearAtomicUpdateResponse res)
+        throws GridCacheEntryRemovedException
+    {
+        GridDhtPartitionTopology top = topology();
+
+        String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+        boolean hasNear = req.nearCache();
+
+        // Assign next version for update inside entries lock.
+        GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+
+        if (hasNear)
+            res.nearVersion(ver);
+
+        if (msgLog.isDebugEnabled()) {
+            msgLog.debug("Assigned update version [futId=" + req.futureId() +
+                ", writeVer=" + ver + ']');
+        }
+
+        assert ver != null : "Got null version for update request: " + req;
+
+        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
+        GridDhtAtomicAbstractUpdateFuture dhtFut = createDhtFuture(ver, req);
+
+        IgniteCacheExpiryPolicy expiry = expiryPolicy(req.expiry());
+
+        GridCacheReturn retVal = null;
+
+        DhtAtomicUpdateResult updRes;
+
+        if (req.size() > 1 &&                    // Several keys ...
+            writeThrough() && !req.skipStore() && // and store is enabled ...
+            !ctx.store().isLocal() &&             // and this is not local 
store ...
+            // (conflict resolver should be used for local store)
+            !ctx.dr().receiveEnabled()            // and no DR.
+            ) {
+            // This method can only be used when there are no replicated 
entries in the batch.
+            updRes = updateWithBatch(node,
+                hasNear,
+                req,
+                res,
+                locked,
+                ver,
+                dhtFut,
+                ctx.isDrEnabled(),
+                taskName,
+                expiry,
+                sndPrevVal);
+
+            dhtFut = updRes.dhtFuture();
+
+            if (req.operation() == TRANSFORM)
+                retVal = updRes.returnValue();
+        }
+        else {
+            updRes = updateSingle(node,
+                hasNear,
+                req,
+                res,
+                locked,
+                ver,
+                dhtFut,
+                ctx.isDrEnabled(),
+                taskName,
+                expiry,
+                sndPrevVal);
+
+            retVal = updRes.returnValue();
+            dhtFut = updRes.dhtFuture();
+        }
+
+        if (retVal == null)
+            retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, 
true);
+
+        res.returnValue(retVal);
+
+        if (dhtFut != null) {
+            if (req.writeSynchronizationMode() == PRIMARY_SYNC
+                // To avoid deadlock disable back-pressure for sender data 
node.
+                && !ctx.discovery().cacheAffinityNode(node, ctx.name())
+                && !dhtFut.isDone()) {
+                final IgniteRunnable tracker = 
GridNioBackPressureControl.threadTracker();
+
+                if (tracker != null && tracker instanceof 
GridNioMessageTracker) {
+                    ((GridNioMessageTracker)tracker).onMessageReceived();
+
+                    dhtFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Void>>() {
+                        @Override public void apply(IgniteInternalFuture<Void> 
fut) {
+                            
((GridNioMessageTracker)tracker).onMessageProcessed();
+                        }
+                    });
+                }
+            }
+
+            ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
+        }
+
+        updRes.expiryPolicy(expiry);
+
+        return updRes;
+    }
+
+    /**
      * Updates locked entries using batched write-through.
      *
      * @param node Sender node.
@@ -1981,7 +2001,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
     @SuppressWarnings("unchecked")
-    private UpdateBatchResult updateWithBatch(
+    private DhtAtomicUpdateResult updateWithBatch(
         final ClusterNode node,
         final boolean hasNear,
         final GridNearAtomicAbstractUpdateRequest req,
@@ -2004,7 +2024,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             catch (IgniteCheckedException e) {
                 res.addFailedKeys(req.keys(), e);
 
-                return new UpdateBatchResult();
+                return new DhtAtomicUpdateResult();
             }
         }
 
@@ -2018,7 +2038,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         List<CacheObject> writeVals = null;
 
-        UpdateBatchResult updRes = new UpdateBatchResult();
+        DhtAtomicUpdateResult updRes = new DhtAtomicUpdateResult();
 
         List<GridDhtCacheEntry> filtered = new ArrayList<>(size);
 
@@ -2317,7 +2337,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         updRes.dhtFuture(dhtFut);
 
-        updRes.invokeResult(invokeRes);
+        updRes.returnValue(invokeRes);
 
         return updRes;
     }
@@ -2390,7 +2410,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
-    private UpdateSingleResult updateSingle(
+    private DhtAtomicUpdateResult updateSingle(
         ClusterNode nearNode,
         boolean hasNear,
         GridNearAtomicAbstractUpdateRequest req,
@@ -2577,7 +2597,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         }
 
-        return new UpdateSingleResult(retVal, deleted, dhtFut);
+        return new DhtAtomicUpdateResult(retVal, deleted, dhtFut);
     }
 
     /**
@@ -2615,7 +2635,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final boolean replicate,
-        final UpdateBatchResult batchRes,
+        final DhtAtomicUpdateResult batchRes,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
         final boolean sndPrevVal
@@ -3060,7 +3080,16 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 ", node=" + nodeId + ']');
         }
 
-        updateAllAsyncInternal(nodeId, req, updateReplyClos);
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null) {
+            U.warn(msgLog, "Skip near update request, node originated update 
request left [" +
+                "futId=" + req.futureId() + ", node=" + nodeId + ']');
+
+            return;
+        }
+
+        updateAllAsyncInternal(node, req, updateReplyClos);
     }
 
     /**
@@ -3541,119 +3570,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     }
 
     /**
-     * Result of {@link GridDhtAtomicCache#updateSingle} execution.
-     */
-    private static class UpdateSingleResult {
-        /** */
-        private final GridCacheReturn retVal;
-
-        /** */
-        private final Collection<IgniteBiTuple<GridDhtCacheEntry, 
GridCacheVersion>> deleted;
-
-        /** */
-        private final GridDhtAtomicAbstractUpdateFuture dhtFut;
-
-        /**
-         * @param retVal Return value.
-         * @param deleted Deleted entries.
-         * @param dhtFut DHT future.
-         */
-        private UpdateSingleResult(GridCacheReturn retVal,
-            Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> 
deleted,
-            GridDhtAtomicAbstractUpdateFuture dhtFut) {
-            this.retVal = retVal;
-            this.deleted = deleted;
-            this.dhtFut = dhtFut;
-        }
-
-        /**
-         * @return Return value.
-         */
-        private GridCacheReturn returnValue() {
-            return retVal;
-        }
-
-        /**
-         * @return Deleted entries.
-         */
-        private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> 
deleted() {
-            return deleted;
-        }
-
-        /**
-         * @return DHT future.
-         */
-        public GridDhtAtomicAbstractUpdateFuture dhtFuture() {
-            return dhtFut;
-        }
-    }
-
-    /**
-     * Result of {@link GridDhtAtomicCache#updateWithBatch} execution.
-     */
-    private static class UpdateBatchResult {
-        /** */
-        private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> 
deleted;
-
-        /** */
-        private GridDhtAtomicAbstractUpdateFuture dhtFut;
-
-        /** */
-        private GridCacheReturn invokeRes;
-
-        /**
-         * @param entry Entry.
-         * @param updRes Entry update result.
-         * @param entries All entries.
-         */
-        private void addDeleted(GridDhtCacheEntry entry,
-            GridCacheUpdateAtomicResult updRes,
-            Collection<GridDhtCacheEntry> entries) {
-            if (updRes.removeVersion() != null) {
-                if (deleted == null)
-                    deleted = new ArrayList<>(entries.size());
-
-                deleted.add(F.t(entry, updRes.removeVersion()));
-            }
-        }
-
-        /**
-         * @return Deleted entries.
-         */
-        private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> 
deleted() {
-            return deleted;
-        }
-
-        /**
-         * @return DHT future.
-         */
-        public GridDhtAtomicAbstractUpdateFuture dhtFuture() {
-            return dhtFut;
-        }
-
-        /**
-         * @param invokeRes Result for invoke operation.
-         */
-        private void invokeResult(GridCacheReturn invokeRes) {
-            this.invokeRes = invokeRes;
-        }
-
-        /**
-         * @return Result for invoke operation.
-         */
-        GridCacheReturn invokeResults() {
-            return invokeRes;
-        }
-
-        /**
-         * @param dhtFut DHT future.
-         */
-        private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture 
dhtFut) {
-            this.dhtFut = dhtFut;
-        }
-    }
-
-    /**
      *
      */
     private static class FinishedLockFuture extends 
GridFinishedFuture<Boolean> implements GridDhtFuture<Boolean> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 6fe96a4..983b18a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -296,7 +296,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
      */
     final void sendSingleRequest(UUID nodeId, 
GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
-            cache.updateAllAsyncInternal(nodeId, req,
+            cache.updateAllAsyncInternal(cctx.localNode(), req,
                 new GridDhtAtomicCache.UpdateReplyClosure() {
                     @Override public void 
apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse 
res) {
                         if (syncMode != FULL_ASYNC)

http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 138645d..930012a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -706,7 +706,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
         }
 
         if (locUpdate != null) {
-            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+            cache.updateAllAsyncInternal(cctx.localNode(), locUpdate,
                 new GridDhtAtomicCache.UpdateReplyClosure() {
                     @Override public void 
apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse 
res) {
                         if (syncMode != FULL_ASYNC)
@@ -730,7 +730,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
      * @param topVer Topology version.
      * @param remapKeys Keys to remap.
      */
-    void map(AffinityTopologyVersion topVer, @Nullable 
Collection<KeyCacheObject> remapKeys) {
+    private void map(AffinityTopologyVersion topVer, @Nullable 
Collection<KeyCacheObject> remapKeys) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {

Reply via email to