Tried to simplify GridDhtAtomicCache.updateAllAsyncInternal0.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5704e393 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5704e393 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5704e393 Branch: refs/heads/ignite-5578 Commit: 5704e393102cc8c24df7bfb4ff9053003530b7fc Parents: 9e79c4b Author: sboikov <[email protected]> Authored: Thu Jul 27 12:50:37 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jul 27 12:50:37 2017 +0300 ---------------------------------------------------------------------- .../dht/atomic/DhtAtomicUpdateResult.java | 131 +++++++ .../dht/atomic/GridDhtAtomicCache.java | 380 ++++++++----------- .../GridNearAtomicAbstractUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- 4 files changed, 282 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java new file mode 100644 index 0000000..e7d2b19 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; +import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class DhtAtomicUpdateResult { + /** */ + private GridCacheReturn retVal; + + /** */ + private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; + + /** */ + private GridDhtAtomicAbstractUpdateFuture dhtFut; + + /** */ + private IgniteCacheExpiryPolicy expiry; + + /** + * + */ + DhtAtomicUpdateResult() { + // No-op. + } + + /** + * @param retVal Return value. + * @param deleted Deleted entries. + * @param dhtFut DHT update future. + */ + DhtAtomicUpdateResult(GridCacheReturn retVal, + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted, + GridDhtAtomicAbstractUpdateFuture dhtFut) { + this.retVal = retVal; + this.deleted = deleted; + this.dhtFut = dhtFut; + } + + /** + * @param expiry Expiry policy. + */ + void expiryPolicy(@Nullable IgniteCacheExpiryPolicy expiry) { + this.expiry = expiry; + } + + /** + * @return Expiry policy. + */ + @Nullable IgniteCacheExpiryPolicy expiryPolicy() { + return expiry; + } + + /** + * @param entry Entry. + * @param updRes Entry update result. + * @param entries All entries. + */ + void addDeleted(GridDhtCacheEntry entry, + GridCacheUpdateAtomicResult updRes, + Collection<GridDhtCacheEntry> entries) { + if (updRes.removeVersion() != null) { + if (deleted == null) + deleted = new ArrayList<>(entries.size()); + + deleted.add(F.t(entry, updRes.removeVersion())); + } + } + + /** + * @return Deleted entries. + */ + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() { + return deleted; + } + + /** + * @return DHT future. + */ + GridDhtAtomicAbstractUpdateFuture dhtFuture() { + return dhtFut; + } + + /** + * @param retVal Result for operation. + */ + void returnValue(GridCacheReturn retVal) { + this.retVal = retVal; + } + + /** + * @return Result for invoke operation. + */ + GridCacheReturn returnValue() { + return retVal; + } + + /** + * @param dhtFut DHT future. + */ + void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) { + this.dhtFut = dhtFut; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 712babd..be4aace 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1658,12 +1658,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Executes local update. * - * @param nodeId Node ID. + * @param node Node. * @param req Update request. * @param completionCb Completion callback. */ void updateAllAsyncInternal( - final UUID nodeId, + final ClusterNode node, final GridNearAtomicAbstractUpdateRequest req, final UpdateReplyClosure completionCb ) { @@ -1678,12 +1678,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } catch (IgniteCheckedException e) { - onForceKeysError(nodeId, req, completionCb, e); + onForceKeysError(node.id(), req, completionCb, e); return; } - updateAllAsyncInternal0(nodeId, req, completionCb); + updateAllAsyncInternal0(node, req, completionCb); } else { forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { @@ -1695,12 +1695,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } catch (IgniteCheckedException e) { - onForceKeysError(nodeId, req, completionCb, e); + onForceKeysError(node.id(), req, completionCb, e); return; } - updateAllAsyncInternal0(nodeId, req, completionCb); + updateAllAsyncInternal0(node, req, completionCb); } }); } @@ -1732,26 +1732,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Executes local update after preloader fetched values. * - * @param nodeId Node ID. + * @param node Node. * @param req Update request. * @param completionCb Completion callback. */ private void updateAllAsyncInternal0( - UUID nodeId, + ClusterNode node, GridNearAtomicAbstractUpdateRequest req, UpdateReplyClosure completionCb ) { - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) { - U.warn(msgLog, "Skip near update request, node originated update request left [" + - "futId=" + req.futureId() + ", node=" + nodeId + ']'); - - return; - } - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), - nodeId, + node.id(), req.futureId(), req.partition(), false, @@ -1763,8 +1754,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean remap = false; - String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - IgniteCacheExpiryPolicy expiry = null; ctx.shared().database().checkpointReadLock(); @@ -1795,97 +1784,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Do not check topology version if topology was locked on near node by // external transaction or explicit lock. if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { - boolean hasNear = req.nearCache(); - - // Assign next version for update inside entries lock. - GridCacheVersion ver = ctx.versions().next(top.topologyVersion()); - - if (hasNear) - res.nearVersion(ver); - - if (msgLog.isDebugEnabled()) { - msgLog.debug("Assigned update version [futId=" + req.futureId() + - ", writeVer=" + ver + ']'); - } - - assert ver != null : "Got null version for update request: " + req; - - boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); - - dhtFut = createDhtFuture(ver, req); - - expiry = expiryPolicy(req.expiry()); - - GridCacheReturn retVal = null; - - if (req.size() > 1 && // Several keys ... - writeThrough() && !req.skipStore() && // and store is enabled ... - !ctx.store().isLocal() && // and this is not local store ... - // (conflict resolver should be used for local store) - !ctx.dr().receiveEnabled() // and no DR. - ) { - // This method can only be used when there are no replicated entries in the batch. - UpdateBatchResult updRes = updateWithBatch(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); - - if (req.operation() == TRANSFORM) - retVal = updRes.invokeResults(); - } - else { - UpdateSingleResult updRes = updateSingle(node, - hasNear, - req, - res, - locked, - ver, - dhtFut, - ctx.isDrEnabled(), - taskName, - expiry, - sndPrevVal); - - retVal = updRes.returnValue(); - deleted = updRes.deleted(); - dhtFut = updRes.dhtFuture(); - } + DhtAtomicUpdateResult updRes = update(node, locked, req, res); - if (retVal == null) - retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true); - - res.returnValue(retVal); - - if (dhtFut != null) { - if (req.writeSynchronizationMode() == PRIMARY_SYNC - // To avoid deadlock disable back-pressure for sender data node. - && !ctx.discovery().cacheAffinityNode(node, ctx.name()) - && !dhtFut.isDone()) { - final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker(); - - if (tracker != null && tracker instanceof GridNioMessageTracker) { - ((GridNioMessageTracker)tracker).onMessageReceived(); - - dhtFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { - @Override public void apply(IgniteInternalFuture<Void> fut) { - ((GridNioMessageTracker)tracker).onMessageProcessed(); - } - }); - } - } - - ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut); - } + dhtFut = updRes.dhtFuture(); + deleted = updRes.deleted(); + expiry = updRes.expiryPolicy(); } else { // Should remap all keys. @@ -1953,9 +1856,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb.apply(req, res); } - else + else { if (dhtFut != null) dhtFut.map(node, res.returnValue(), res, completionCb); + } if (req.writeSynchronizationMode() != FULL_ASYNC) req.cleanup(!node.isLocal()); @@ -1964,6 +1868,122 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** + * @param node Node. + * @param locked Entries. + * @param req Request. + * @param res Response. + * @return Operation result. + * @throws GridCacheEntryRemovedException If got obsolete entry. + */ + private DhtAtomicUpdateResult update( + ClusterNode node, + List<GridDhtCacheEntry> locked, + GridNearAtomicAbstractUpdateRequest req, + GridNearAtomicUpdateResponse res) + throws GridCacheEntryRemovedException + { + GridDhtPartitionTopology top = topology(); + + String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + + boolean hasNear = req.nearCache(); + + // Assign next version for update inside entries lock. + GridCacheVersion ver = ctx.versions().next(top.topologyVersion()); + + if (hasNear) + res.nearVersion(ver); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Assigned update version [futId=" + req.futureId() + + ", writeVer=" + ver + ']'); + } + + assert ver != null : "Got null version for update request: " + req; + + boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); + + GridDhtAtomicAbstractUpdateFuture dhtFut = createDhtFuture(ver, req); + + IgniteCacheExpiryPolicy expiry = expiryPolicy(req.expiry()); + + GridCacheReturn retVal = null; + + DhtAtomicUpdateResult updRes; + + if (req.size() > 1 && // Several keys ... + writeThrough() && !req.skipStore() && // and store is enabled ... + !ctx.store().isLocal() && // and this is not local store ... + // (conflict resolver should be used for local store) + !ctx.dr().receiveEnabled() // and no DR. + ) { + // This method can only be used when there are no replicated entries in the batch. + updRes = updateWithBatch(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + dhtFut = updRes.dhtFuture(); + + if (req.operation() == TRANSFORM) + retVal = updRes.returnValue(); + } + else { + updRes = updateSingle(node, + hasNear, + req, + res, + locked, + ver, + dhtFut, + ctx.isDrEnabled(), + taskName, + expiry, + sndPrevVal); + + retVal = updRes.returnValue(); + dhtFut = updRes.dhtFuture(); + } + + if (retVal == null) + retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true); + + res.returnValue(retVal); + + if (dhtFut != null) { + if (req.writeSynchronizationMode() == PRIMARY_SYNC + // To avoid deadlock disable back-pressure for sender data node. + && !ctx.discovery().cacheAffinityNode(node, ctx.name()) + && !dhtFut.isDone()) { + final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker(); + + if (tracker != null && tracker instanceof GridNioMessageTracker) { + ((GridNioMessageTracker)tracker).onMessageReceived(); + + dhtFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> fut) { + ((GridNioMessageTracker)tracker).onMessageProcessed(); + } + }); + } + } + + ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut); + } + + updRes.expiryPolicy(expiry); + + return updRes; + } + + /** * Updates locked entries using batched write-through. * * @param node Sender node. @@ -1981,7 +2001,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @throws GridCacheEntryRemovedException Should not be thrown. */ @SuppressWarnings("unchecked") - private UpdateBatchResult updateWithBatch( + private DhtAtomicUpdateResult updateWithBatch( final ClusterNode node, final boolean hasNear, final GridNearAtomicAbstractUpdateRequest req, @@ -2004,7 +2024,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { catch (IgniteCheckedException e) { res.addFailedKeys(req.keys(), e); - return new UpdateBatchResult(); + return new DhtAtomicUpdateResult(); } } @@ -2018,7 +2038,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { List<CacheObject> writeVals = null; - UpdateBatchResult updRes = new UpdateBatchResult(); + DhtAtomicUpdateResult updRes = new DhtAtomicUpdateResult(); List<GridDhtCacheEntry> filtered = new ArrayList<>(size); @@ -2317,7 +2337,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.dhtFuture(dhtFut); - updRes.invokeResult(invokeRes); + updRes.returnValue(invokeRes); return updRes; } @@ -2390,7 +2410,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ - private UpdateSingleResult updateSingle( + private DhtAtomicUpdateResult updateSingle( ClusterNode nearNode, boolean hasNear, GridNearAtomicAbstractUpdateRequest req, @@ -2577,7 +2597,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - return new UpdateSingleResult(retVal, deleted, dhtFut); + return new DhtAtomicUpdateResult(retVal, deleted, dhtFut); } /** @@ -2615,7 +2635,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final boolean replicate, - final UpdateBatchResult batchRes, + final DhtAtomicUpdateResult batchRes, final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, final boolean sndPrevVal @@ -3060,7 +3080,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ", node=" + nodeId + ']'); } - updateAllAsyncInternal(nodeId, req, updateReplyClos); + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + U.warn(msgLog, "Skip near update request, node originated update request left [" + + "futId=" + req.futureId() + ", node=" + nodeId + ']'); + + return; + } + + updateAllAsyncInternal(node, req, updateReplyClos); } /** @@ -3541,119 +3570,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** - * Result of {@link GridDhtAtomicCache#updateSingle} execution. - */ - private static class UpdateSingleResult { - /** */ - private final GridCacheReturn retVal; - - /** */ - private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; - - /** */ - private final GridDhtAtomicAbstractUpdateFuture dhtFut; - - /** - * @param retVal Return value. - * @param deleted Deleted entries. - * @param dhtFut DHT future. - */ - private UpdateSingleResult(GridCacheReturn retVal, - Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted, - GridDhtAtomicAbstractUpdateFuture dhtFut) { - this.retVal = retVal; - this.deleted = deleted; - this.dhtFut = dhtFut; - } - - /** - * @return Return value. - */ - private GridCacheReturn returnValue() { - return retVal; - } - - /** - * @return Deleted entries. - */ - private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() { - return deleted; - } - - /** - * @return DHT future. - */ - public GridDhtAtomicAbstractUpdateFuture dhtFuture() { - return dhtFut; - } - } - - /** - * Result of {@link GridDhtAtomicCache#updateWithBatch} execution. - */ - private static class UpdateBatchResult { - /** */ - private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; - - /** */ - private GridDhtAtomicAbstractUpdateFuture dhtFut; - - /** */ - private GridCacheReturn invokeRes; - - /** - * @param entry Entry. - * @param updRes Entry update result. - * @param entries All entries. - */ - private void addDeleted(GridDhtCacheEntry entry, - GridCacheUpdateAtomicResult updRes, - Collection<GridDhtCacheEntry> entries) { - if (updRes.removeVersion() != null) { - if (deleted == null) - deleted = new ArrayList<>(entries.size()); - - deleted.add(F.t(entry, updRes.removeVersion())); - } - } - - /** - * @return Deleted entries. - */ - private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() { - return deleted; - } - - /** - * @return DHT future. - */ - public GridDhtAtomicAbstractUpdateFuture dhtFuture() { - return dhtFut; - } - - /** - * @param invokeRes Result for invoke operation. - */ - private void invokeResult(GridCacheReturn invokeRes) { - this.invokeRes = invokeRes; - } - - /** - * @return Result for invoke operation. - */ - GridCacheReturn invokeResults() { - return invokeRes; - } - - /** - * @param dhtFut DHT future. - */ - private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) { - this.dhtFut = dhtFut; - } - } - - /** * */ private static class FinishedLockFuture extends GridFinishedFuture<Boolean> implements GridDhtFuture<Boolean> { http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 6fe96a4..983b18a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -296,7 +296,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture */ final void sendSingleRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, + cache.updateAllAsyncInternal(cctx.localNode(), req, new GridDhtAtomicCache.UpdateReplyClosure() { @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { if (syncMode != FULL_ASYNC) http://git-wip-us.apache.org/repos/asf/ignite/blob/5704e393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 138645d..930012a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -706,7 +706,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } if (locUpdate != null) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + cache.updateAllAsyncInternal(cctx.localNode(), locUpdate, new GridDhtAtomicCache.UpdateReplyClosure() { @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { if (syncMode != FULL_ASYNC) @@ -730,7 +730,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param topVer Topology version. * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { + private void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) {
