This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll-backup
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit f37831b307cfc890bb5dc374c0023ccac7be76af
Author: sboikov <sboi...@apache.org>
AuthorDate: Sun Feb 24 14:09:23 2019 +0300

    invokeAll
---
 .../processors/cache/GridCacheMapEntry.java        |  11 +-
 .../cache/IgniteCacheOffheapManager.java           |   7 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |  27 +-
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 762 ++++++++++++++-------
 .../cache/persistence/GridCacheOffheapManager.java |   9 +-
 .../cache/persistence/tree/BPlusTree.java          |   6 +-
 .../processors/cache/tree/SearchRowEx.java         |  45 ++
 7 files changed, 599 insertions(+), 268 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 23b14b6..acd4349 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2239,7 +2239,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, 
GridClosureException {
         assert cctx.atomic() && !detached();
 
-        if (!primary && !isNear())
+        if (!primary && !isNear() && c == null)
             ensureFreeSpace();
 
         if (!primary) {
@@ -5903,7 +5903,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         private boolean wasIntercepted;
 
         /** */
-        AtomicCacheUpdateClosure(
+        public AtomicCacheUpdateClosure(
             GridCacheMapEntry entry,
             AffinityTopologyVersion topVer,
             GridCacheVersion newVer,
@@ -5960,6 +5960,13 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             }
         }
 
+        /**
+         * @return Update result.
+         */
+        public GridCacheUpdateAtomicResult updateResult() {
+            return updateRes;
+        }
+
         /** {@inheritDoc} */
         @Nullable @Override public CacheDataRow oldRow() {
             return oldRow;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 870d99f..9455c2a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
@@ -200,7 +201,7 @@ public interface IgniteCacheOffheapManager {
     public void invokeAll(GridCacheContext cctx,
         GridDhtLocalPartition part,
         Collection<? extends CacheSearchRow> rows,
-        Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map)
+        Function<CacheSearchRow, OffheapInvokeClosure> closures)
         throws IgniteCheckedException;
 
     /**
@@ -899,7 +900,7 @@ public interface IgniteCacheOffheapManager {
          * @return Cache search row.
          * @throws IgniteCheckedException If failed.
          */
-        public CacheSearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key) throws IgniteCheckedException;
+        public CacheSearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key, Object data) throws IgniteCheckedException;
 
         /**
          * @return Rows comparator.
@@ -915,7 +916,7 @@ public interface IgniteCacheOffheapManager {
          */
         public void invokeAll(GridCacheContext cctx,
             Collection<? extends CacheSearchRow> rows,
-            Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map)
+            Function<CacheSearchRow, OffheapInvokeClosure> closures)
             throws IgniteCheckedException;
 
         /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 3ae5b81..0a621dc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
@@ -80,6 +81,7 @@ import 
org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingRow;
 import org.apache.ignite.internal.processors.cache.tree.RowLinkIO;
 import org.apache.ignite.internal.processors.cache.tree.SearchRow;
+import org.apache.ignite.internal.processors.cache.tree.SearchRowEx;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
 import 
org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRow;
 import 
org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
@@ -456,9 +458,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridCacheContext cctx,
         GridDhtLocalPartition part,
         Collection<? extends CacheSearchRow> rows,
-        Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map)
+        Function<CacheSearchRow, OffheapInvokeClosure> closures)
         throws IgniteCheckedException {
-        dataStore(part).invokeAll(cctx, rows, map);
+        dataStore(part).invokeAll(cctx, rows, closures);
     }
 
     /** {@inheritDoc} */
@@ -1650,8 +1652,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public SearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key) {
-            return new SearchRow(grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID, key);
+        @Override public SearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key, Object data) {
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
+
+            return data != null ? new SearchRowEx<>(cacheId, key, data) : new 
SearchRow(cacheId, key);
         }
 
         /** {@inheritDoc} */
@@ -1660,7 +1664,10 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public void invokeAll(GridCacheContext cctx, Collection<? 
extends CacheSearchRow> rows, Map<? extends CacheSearchRow, ? extends 
OffheapInvokeClosure> map) throws IgniteCheckedException {
+        @Override public void invokeAll(GridCacheContext cctx,
+                Collection<? extends CacheSearchRow> rows,
+                Function<CacheSearchRow, OffheapInvokeClosure> closures)
+                throws IgniteCheckedException {
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
 
@@ -1668,16 +1675,18 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 RuntimeException err = null;
 
                 try {
-                    dataTree.invokeAll(rows.iterator(), 
CacheDataRowAdapter.RowData.NO_KEY, map::get);
+                    dataTree.invokeAll(rows.iterator(), 
CacheDataRowAdapter.RowData.NO_KEY, closures);
                 }
                 catch (UnregisteredClassException | 
UnregisteredBinaryTypeException clsErr) {
                     err = clsErr;
                 }
 
-                for (Map.Entry<? extends CacheSearchRow, ? extends 
OffheapInvokeClosure> e : map.entrySet()) {
+                for (CacheSearchRow row : rows) {
+                    OffheapInvokeClosure c = closures.apply(row);
+
                     // Update could be interrupted in the middle, finish 
update only for processed entries.
-                    if (e.getValue().operationType() != null)
-                        finishInvoke(cctx, e.getKey().key(), e.getValue());
+                    if (c.operationType() != null)
+                        finishInvoke(cctx, row.key(), c);
                 }
 
                 if (err != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 329165a..cf23e25 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import 
org.apache.ignite.internal.processors.cache.GridCacheMapEntry.AtomicCacheBatchUpdateClosure;
+import 
org.apache.ignite.internal.processors.cache.GridCacheMapEntry.AtomicCacheUpdateClosure;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
@@ -95,6 +96,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.tree.SearchRowEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -1760,6 +1762,17 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             DhtAtomicUpdateResult  updDhtRes = new DhtAtomicUpdateResult();
 
             try {
+                Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, 
AtomicCacheBatchUpdateClosure>> byPart = null;
+
+                if (req.size() > 1 &&                    // Several keys ...
+                    writeThrough() && !req.skipStore() && // and store is 
enabled ...
+                    !ctx.store().isLocal() &&             // and this is not 
local store ...
+                    // (conflict resolver should be used for local store)
+                    !ctx.dr().receiveEnabled()            // and no DR.
+                ) {
+                    byPart = new HashMap<>();
+                }
+
                 while (true) {
                     try {
                         GridDhtPartitionTopology top = topology();
@@ -1855,7 +1868,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                                     }
                                 }
 
-                                update(node, locked, req, res, updDhtRes);
+                                update(node, locked, req, res, updDhtRes, 
byPart);
 
                                 dhtFut = updDhtRes.dhtFuture();
                                 deleted = updDhtRes.deleted();
@@ -1964,15 +1977,16 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param req Request.
      * @param res Response.
      * @param dhtUpdRes DHT update result
-     * @return Operation result.
      * @throws GridCacheEntryRemovedException If got obsolete entry.
      */
-    private DhtAtomicUpdateResult update(
+    private void update(
         ClusterNode node,
         List<GridDhtCacheEntry> locked,
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res,
-        DhtAtomicUpdateResult dhtUpdRes)
+        DhtAtomicUpdateResult dhtUpdRes,
+        @Nullable Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, 
AtomicCacheBatchUpdateClosure>> byPart
+    )
         throws GridCacheEntryRemovedException
     {
         GridDhtPartitionTopology top = topology();
@@ -2006,12 +2020,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         ctx.group().listenerLock().readLock().lock();
 
         try {
-            if (req.size() > 1 &&                    // Several keys ...
-                writeThrough() && !req.skipStore() && // and store is enabled 
...
-                !ctx.store().isLocal() &&             // and this is not local 
store ...
-                // (conflict resolver should be used for local store)
-                !ctx.dr().receiveEnabled()            // and no DR.
-            ) {
+            if (byPart != null) {
                 // This method can only be used when there are no replicated 
entries in the batch.
                 updateWithBatch(node,
                     hasNear,
@@ -2023,7 +2032,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     taskName,
                     expiry,
                     sndPrevVal,
-                    dhtUpdRes);
+                    dhtUpdRes,
+                    byPart);
 
                 if (req.operation() == TRANSFORM)
                     retVal = dhtUpdRes.returnValue();
@@ -2077,8 +2087,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         }
 
         dhtUpdRes.expiryPolicy(expiry);
-
-        return dhtUpdRes;
     }
 
     /**
@@ -2109,7 +2117,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
         final boolean sndPrevVal,
-        final DhtAtomicUpdateResult dhtUpdRes
+        final DhtAtomicUpdateResult dhtUpdRes,
+        final Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, 
AtomicCacheBatchUpdateClosure>> byPart
     ) throws GridCacheEntryRemovedException {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during 
DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should 
not request return values for putAll.
@@ -2270,22 +2279,17 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         // Update previous batch.
                         if (putMap != null) {
                             updatePartialBatch(
-                                hasNear,
                                 firstEntryIdx,
                                 filtered,
                                 ver,
-                                node,
                                 writeVals,
                                 putMap,
                                 null,
-                                entryProcessorMap,
                                 req,
                                 res,
-                                replicate,
                                 dhtUpdRes,
-                                taskName,
                                 expiry,
-                                sndPrevVal);
+                                byPart);
 
                             firstEntryIdx = i;
 
@@ -2317,22 +2321,17 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         // Update previous batch.
                         if (rmvKeys != null) {
                             updatePartialBatch(
-                                hasNear,
                                 firstEntryIdx,
                                 filtered,
                                 ver,
-                                node,
                                 null,
                                 null,
                                 rmvKeys,
-                                entryProcessorMap,
                                 req,
                                 res,
-                                replicate,
                                 dhtUpdRes,
-                                taskName,
                                 expiry,
-                                sndPrevVal);
+                                byPart);
 
                             firstEntryIdx = i;
 
@@ -2440,27 +2439,65 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         // Store final batch.
         if (putMap != null || rmvKeys != null) {
             updatePartialBatch(
-                hasNear,
                 firstEntryIdx,
                 filtered,
                 ver,
-                node,
                 writeVals,
                 putMap,
                 rmvKeys,
-                entryProcessorMap,
                 req,
                 res,
-                replicate,
                 dhtUpdRes,
-                taskName,
                 expiry,
-                sndPrevVal);
+                byPart);
         }
         else
             assert filtered.isEmpty();
 
         dhtUpdRes.returnValue(invokeRes);
+
+        AffinityAssignment affAssignment = 
ctx.affinity().assignment(req.topologyVersion());
+
+        GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
+
+        for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, 
AtomicCacheBatchUpdateClosure>> e0 : byPart.entrySet()) {
+            try {
+                Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = 
e0.getValue();
+
+                ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), 
map::get);
+
+                for (Map.Entry<CacheSearchRow, AtomicCacheBatchUpdateClosure> 
e : map.entrySet()) {
+                    AtomicCacheBatchUpdateClosure c = e.getValue();
+
+                    updateSingleEntryPartialBatch(
+                        (GridDhtCacheEntry)c.entry(),
+                        c,
+                        ver,
+                        (CacheObject)c.writeValue(),
+                        entryProcessorMap,
+                        node,
+                        hasNear,
+                        taskName,
+                        expiry,
+                        drType,
+                        req,
+                        res,
+                        dhtUpdRes,
+                        c.reqIdx,
+                        affAssignment,
+                        sndPrevVal);
+                }
+            }
+            catch (GridCacheEntryRemovedException e) {
+                assert false : "Entry cannot become obsolete while holding 
lock.";
+
+                e.printStackTrace();
+            }
+            catch (IgniteCheckedException e) {
+                for (CacheSearchRow row : e0.getValue().keySet())
+                    res.addFailedKey(row.key(), e);
+            }
+        }
     }
 
     /**
@@ -2614,7 +2651,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         ctx.disableTriggeringCacheInterceptorOnConflict()
                     );
 
-                    map.put(dataStore.createSearchRow(ctx, entry.key()), c);
+                    map.put(dataStore.createSearchRow(ctx, entry.key(), null), 
c);
                 }
                 catch (IgniteCheckedException e) {
                     res.addFailedKey(entry.key(), e);
@@ -2628,7 +2665,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = 
e0.getValue();
 
                     try {
-                        ctx.offheap().invokeAll(ctx, e0.getKey(), 
map.keySet(), map);
+                        ctx.offheap().invokeAll(ctx, e0.getKey(), 
map.keySet(), map::get);
                     }
                     catch (UnregisteredClassException | 
UnregisteredBinaryTypeException e) {
                         err = e;
@@ -2915,22 +2952,17 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param sndPrevVal If {@code true} sends previous value to backups.
      */
     private void updatePartialBatch(
-        final boolean hasNear,
         final int firstEntryIdx,
         final List<GridDhtCacheEntry> entries,
         final GridCacheVersion ver,
-        final ClusterNode nearNode,
         @Nullable final List<CacheObject> writeVals,
         @Nullable final Map<KeyCacheObject, CacheObject> putMap,
         @Nullable final Collection<KeyCacheObject> rmvKeys,
-        @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, 
Object>> entryProcessorMap,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
-        final boolean replicate,
         final DhtAtomicUpdateResult dhtUpdRes,
-        final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
-        final boolean sndPrevVal
+        Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, 
AtomicCacheBatchUpdateClosure>> byPart
     ) {
         assert putMap == null ^ rmvKeys == null;
 
@@ -2971,17 +3003,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 op = DELETE;
             }
 
-            AffinityAssignment affAssignment = 
ctx.affinity().assignment(topVer);
-
-            GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
-
             Collection<Object> failedToUnwrapKeys = null;
 
             int cnt = entries.size();
 
-            Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, 
AtomicCacheBatchUpdateClosure>> byPart =
-                cnt > 1 ? new HashMap<>() : null;
-
             // Avoid iterator creation.
             for (int i = 0; i < cnt; i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -3015,119 +3040,49 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         continue;
                 }
 
-                try {
-                    // We are holding java-level locks on entries at this 
point.
-                    CacheObject writeVal = op == UPDATE ? writeVals.get(i) : 
null;
+                // We are holding java-level locks on entries at this point.
+                CacheObject writeVal = op == UPDATE ? writeVals.get(i) : null;
 
-                    assert writeVal != null || op == DELETE : "null write 
value found.";
+                assert writeVal != null || op == DELETE : "null write value 
found.";
 
-                    if (byPart != null) {
-                        GridDhtLocalPartition part = entry.localPartition();
+                GridDhtLocalPartition part = entry.localPartition();
 
-                        TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure> 
map = byPart.get(part);
+                TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = 
byPart.get(part);
 
-                        IgniteCacheOffheapManager.CacheDataStore dataStore = 
ctx.offheap().dataStore(part);
+                IgniteCacheOffheapManager.CacheDataStore dataStore = 
ctx.offheap().dataStore(part);
 
-                        if (map == null)
-                            byPart.put(part, map = new 
TreeMap<>(dataStore.rowsComparator()));
+                if (map == null)
+                    byPart.put(part, map = new 
TreeMap<>(dataStore.rowsComparator()));
 
-                        AtomicCacheBatchUpdateClosure c = new 
AtomicCacheBatchUpdateClosure(
-                            firstEntryIdx + i,
-                            entry,
-                            topVer,
-                            ver,
-                            op,
-                            writeVal,
-                            req.invokeArguments(),
-                            /*read-through*/false,
-                            /*write-through*/false,
-                            req.keepBinary(),
-                            expiry,
-                            /*primary*/true,
-                            /*verCheck*/false,
-                            req.filter(),
-                            CU.TTL_NOT_CHANGED,
-                            CU.EXPIRE_TIME_CALCULATE,
-                            null,
-                            /*conflictResolve*/false,
-                            false,
-                            null,
-                            ctx.disableTriggeringCacheInterceptorOnConflict()
-                        );
-
-                        map.put(dataStore.createSearchRow(ctx, entry.key()), 
c);
-                    }
-                    else {
-                        updateSingleEntryPartialBatch(entry,
-                            null,
-                            op,
-                            ver,
-                            writeVal,
-                            entryProcessorMap,
-                            nearNode,
-                            hasNear,
-                            taskName,
-                            expiry,
-                            drType,
-                            req,
-                            res,
-                            dhtUpdRes,
-                            i + firstEntryIdx,
-                            affAssignment,
-                            sndPrevVal);
-                    }
-                }
-                catch (GridCacheEntryRemovedException e) {
-                    assert false : "Entry cannot become obsolete while holding 
lock.";
+                AtomicCacheBatchUpdateClosure c = new 
AtomicCacheBatchUpdateClosure(
+                    firstEntryIdx + i,
+                    entry,
+                    topVer,
+                    ver,
+                    op,
+                    writeVal,
+                    req.invokeArguments(),
+                    /*read-through*/false,
+                    /*write-through*/false,
+                    req.keepBinary(),
+                    expiry,
+                    /*primary*/true,
+                    /*verCheck*/false,
+                    req.filter(),
+                    CU.TTL_NOT_CHANGED,
+                    CU.EXPIRE_TIME_CALCULATE,
+                    null,
+                    /*conflictResolve*/false,
+                    false,
+                    null,
+                    ctx.disableTriggeringCacheInterceptorOnConflict()
+                );
 
-                    e.printStackTrace();
-                }
+                map.put(dataStore.createSearchRow(ctx, entry.key(), null), c);
 
                 dhtUpdRes.processedEntriesCount(firstEntryIdx + i + 1);
             }
 
-            if (byPart != null) {
-                for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, 
AtomicCacheBatchUpdateClosure>> e0 : byPart.entrySet()) {
-                    try {
-                        Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map 
= e0.getValue();
-
-                        ctx.offheap().invokeAll(ctx, e0.getKey(), 
map.keySet(), map);
-
-                        for (Map.Entry<CacheSearchRow, 
AtomicCacheBatchUpdateClosure> e : map.entrySet()) {
-                            AtomicCacheBatchUpdateClosure c = e.getValue();
-
-                            updateSingleEntryPartialBatch(
-                                (GridDhtCacheEntry)c.entry(),
-                                c,
-                                op,
-                                ver,
-                                (CacheObject)c.writeValue(),
-                                entryProcessorMap,
-                                nearNode,
-                                hasNear,
-                                taskName,
-                                expiry,
-                                drType,
-                                req,
-                                res,
-                                dhtUpdRes,
-                                c.reqIdx,
-                                affAssignment,
-                                sndPrevVal);
-                        }
-                    }
-                    catch (GridCacheEntryRemovedException e) {
-                        assert false : "Entry cannot become obsolete while 
holding lock.";
-
-                        e.printStackTrace();
-                    }
-                    catch (IgniteCheckedException e) {
-                        for (CacheSearchRow row : e0.getValue().keySet())
-                            res.addFailedKey(row.key(), e);
-                    }
-                }
-            }
-
             if (failedToUnwrapKeys != null) {
                 log.warning("Failed to get values of keys: " + 
failedToUnwrapKeys +
                     " (the binary objects will be used instead).");
@@ -3151,7 +3106,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     private void updateSingleEntryPartialBatch(
         GridDhtCacheEntry entry,
         @Nullable AtomicCacheBatchUpdateClosure c,
-        GridCacheOperation op,
         GridCacheVersion ver,
         @Nullable final CacheObject writeVal,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, 
Object>> entryProcessorMap,
@@ -3177,7 +3131,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         // Get readers before innerUpdate (reader cleared after remove).
         GridDhtCacheEntry.ReaderId[] readers = c != null ? c.rdrs : 
entry.readersLocked();
 
-        assert writeVal != null || op == DELETE : "null write value found.";
+        GridCacheOperation op = writeVal == null ? DELETE : UPDATE;
 
         GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
             c,
@@ -3583,135 +3537,449 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         sendNearUpdateReply(nodeId, res);
     }
 
-    /**
-     * @param nodeId Sender node ID.
-     * @param req Dht atomic update request.
-     */
-    private void processDhtAtomicUpdateRequest(UUID nodeId, 
GridDhtAtomicAbstractUpdateRequest req) {
-        assert Thread.currentThread().getName().startsWith("sys-stripe-") : 
Thread.currentThread().getName();
+    private void dhtAtomicUpdateRequestUpdateBatch(
+        UUID nodeId,
+        GridDhtAtomicAbstractUpdateRequest req,
+        @Nullable GridDhtAtomicNearResponse nearRes,
+        String taskName,
+        boolean writeThrough,
+        boolean intercept
+    ) throws NodeStoppingException {
+        int curPart = -1;
 
-        if (msgLog.isDebugEnabled()) {
-            msgLog.debug("Received DHT atomic update request [futId=" + 
req.futureId() +
-                ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
-        }
+        int batchStart = 0;
+        int batchSize = 0;
 
-        assert req.partition() >= 0 : req;
+        for (int i = 0; i < req.size(); i++) {
+            KeyCacheObject key = req.key(i);
 
-        GridCacheVersion ver = req.writeVersion();
+            int part = key.partition();
 
-        GridDhtAtomicNearResponse nearRes = null;
+            assert part >= 0 : key;
 
-        if (req.nearNodeId() != null) {
-            nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
-                req.partition(),
-                req.nearFutureId(),
+            if (curPart >=0 && curPart != part) {
+                dhtAtomicUpdateRequestUpdateBatch(
+                    batchStart,
+                    batchSize,
+                    nodeId,
+                    req,
+                    nearRes,
+                    taskName,
+                    writeThrough,
+                    intercept);
+
+                batchStart = i;
+                batchSize = 1;
+            }
+            else
+                batchSize++;
+
+            curPart = part;
+        }
+
+        if (batchSize > 0) {
+            dhtAtomicUpdateRequestUpdateBatch(
+                batchStart,
+                batchSize,
                 nodeId,
-                req.flags());
+                req,
+                nearRes,
+                taskName,
+                writeThrough,
+                intercept);
         }
+    }
 
-        boolean replicate = ctx.isDrEnabled();
+    @Nullable private List<SearchRowEx<AtomicCacheUpdateClosure>> 
dhtAtomicUpdateLockBatch(
+        final int batchStart,
+        final int batchSize,
+        final GridDhtAtomicAbstractUpdateRequest req,
+        boolean writeThrough,
+        boolean intercept
+    ) throws IgniteCheckedException {
+        try {
+            List<SearchRowEx<AtomicCacheUpdateClosure>> locked = new 
ArrayList<>(batchSize);
 
-        boolean intercept = req.forceTransformBackups() && 
ctx.config().getInterceptor() != null;
+            SearchRowEx<AtomicCacheUpdateClosure> prev = null;
 
-        String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+            GridDhtLocalPartition part = null;
 
-        ctx.shared().database().checkpointReadLock();
+            while (true) {
+                for (int i = 0; i < batchSize; i++) {
+                    int idx = batchStart + i;
 
-        try {
-            for (int i = 0; i < req.size(); i++) {
-                KeyCacheObject key = req.key(i);
+                    KeyCacheObject key = req.key(idx);
 
-                try {
-                    while (true) {
-                        GridDhtCacheEntry entry = null;
+                    GridDhtCacheEntry entry = entryExx(key);
 
-                        try {
-                            entry = entryExx(key);
+                    if (part == null)
+                        part = entry.localPartition();
+                    else
+                        assert part == entry.localPartition();
 
-                            CacheObject val = req.value(i);
-                            CacheObject prevVal = req.previousValue(i);
+                    CacheObject val = req.value(idx);
 
-                            EntryProcessor<Object, Object, Object> 
entryProcessor = req.entryProcessor(i);
-                            Long updateIdx = req.updateCounter(i);
+                    EntryProcessor<Object, Object, Object> entryProcessor = 
req.entryProcessor(i);
+                    Long updateIdx = req.updateCounter(idx);
 
-                            GridCacheOperation op = entryProcessor != null ? 
TRANSFORM :
-                                (val != null) ? UPDATE : DELETE;
+                    GridCacheOperation op = entryProcessor != null ? TRANSFORM 
: (val != null) ? UPDATE : DELETE;
 
-                            long ttl = req.ttl(i);
-                            long expireTime = req.conflictExpireTime(i);
+                    long ttl = req.ttl(idx);
+                    long expireTime = req.conflictExpireTime(idx);
 
-                            GridCacheUpdateAtomicResult updRes = 
entry.innerUpdate(
-                                null,
-                                ver,
-                                nodeId,
-                                nodeId,
-                                op,
-                                op == TRANSFORM ? entryProcessor : val,
-                                op == TRANSFORM ? req.invokeArguments() : null,
-                                /*write-through*/(ctx.store().isLocal() && 
!ctx.shared().localStorePrimaryOnly())
-                                    && writeThrough() && !req.skipStore(),
-                                /*read-through*/false,
-                                /*retval*/false,
-                                req.keepBinary(),
-                                /*expiry policy*/null,
-                                /*event*/true,
-                                /*metrics*/true,
-                                /*primary*/false,
-                                /*check version*/!req.forceTransformBackups(),
-                                req.topologyVersion(),
-                                CU.empty0(),
-                                replicate ? DR_BACKUP : DR_NONE,
-                                ttl,
-                                expireTime,
-                                req.conflictVersion(i),
-                                false,
-                                intercept,
-                                req.subjectId(),
-                                taskName,
-                                prevVal,
-                                updateIdx,
-                                null,
-                                req.transformOperation());
+                    AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure(
+                        entry,
+                        req.topologyVersion(),
+                        req.writeVersion(),
+                        op,
+                        op == TRANSFORM ? entryProcessor : val,
+                        op == TRANSFORM ? req.invokeArguments() : null,
+                        /*read-through*/false,
+                        writeThrough,
+                        req.keepBinary(),
+                        /*expiry policy*/null,
+                        /*primary*/false,
+                        /*check version*/!req.forceTransformBackups(),
+                        CU.empty0(),
+                        ttl,
+                        expireTime,
+                        req.conflictVersion(i),
+                        false,
+                        intercept,
+                        updateIdx,
+                        ctx.disableTriggeringCacheInterceptorOnConflict());
 
-                            if (updRes.removeVersion() != null)
-                                ctx.onDeferredDelete(entry, 
updRes.removeVersion());
+                    SearchRowEx<AtomicCacheUpdateClosure> row = 
(SearchRowEx<AtomicCacheUpdateClosure>)
+                        part.dataStore().createSearchRow(ctx, key, c);
 
-                            entry.onUnlock();
+                    locked.add(row);
 
-                            break; // While.
-                        }
-                        catch (GridCacheEntryRemovedException ignored) {
-                            if (log.isDebugEnabled())
-                                log.debug("Got removed entry while updating 
backup value (will retry): " + key);
+                    // Expect keys in request are already sorted.
+                    assert prev == null || 
part.dataStore().rowsComparator().compare(row, prev) >= 0;
 
-                            entry = null;
-                        }
-                        finally {
-                            if (entry != null)
-                                entry.touch();
-                        }
+                    prev = row;
+                }
+
+                boolean retry = false;
+
+                for (int i = 0; i < locked.size(); i++) {
+                    SearchRowEx<AtomicCacheUpdateClosure> row = locked.get(i);
+
+                    GridCacheMapEntry entry = row.data().entry();
+
+                    entry.lockEntry();
+
+                    if (entry.obsolete()) {
+                        // Unlock all locked.
+                        for (int j = 0; j <= i; j++)
+                            locked.get(j).data().entry().unlockEntry();
+
+                        // Clear entries.
+                        locked.clear();
+
+                        // Retry.
+                        retry = true;
+
+                        break;
                     }
                 }
-                catch (NodeStoppingException e){
-                    U.warn(log, "Failed to update key on backup (local node is 
stopping): " + key);
 
-                    return;
+                if (!retry)
+                    return locked;
+            }
+        }
+        catch (GridDhtInvalidPartitionException e) {
+            // Ignore, do not need update backup.
+            return null;
+        }
+    }
+
+    private static AtomicCacheUpdateClosure rowClosure(CacheSearchRow row) {
+        return ((SearchRowEx<AtomicCacheUpdateClosure>)row).data();
+    }
+
+    private void dhtAtomicUpdateRequestUpdateBatch(
+        final int batchStart,
+        final int batchSize,
+        final UUID nodeId,
+        final GridDhtAtomicAbstractUpdateRequest req,
+        final @Nullable GridDhtAtomicNearResponse nearRes,
+        final String taskName,
+        final boolean writeThrough,
+        final boolean intercept
+    ) throws NodeStoppingException {
+        assert batchSize > 0 : batchSize;
+        assert batchStart >= 0 && batchStart < req.size() : batchStart;
+
+        if (batchSize == 1) {
+            dhtAtomicUpdateRequestUpdateSingle(
+                nodeId,
+                req,
+                nearRes,
+                taskName,
+                writeThrough,
+                intercept,
+                batchStart);
+
+            return;
+        }
+
+        try {
+            ctx.shared().database().ensureFreeSpace(ctx.dataRegion());
+
+            List<SearchRowEx<AtomicCacheUpdateClosure>> locked = 
dhtAtomicUpdateLockBatch(
+                batchStart,
+                batchSize,
+                req,
+                writeThrough,
+                intercept);
+
+            if (locked == null)
+                return;
+
+            assert !locked.isEmpty();
+
+            IgniteCacheOffheapManager.CacheDataStore dataStore = 
((GridDhtCacheEntry)locked.get(0).data().entry()).localPartition().dataStore();
+
+            try {
+                dataStore.invokeAll(ctx, locked, 
GridDhtAtomicCache::rowClosure);
+
+                for (int i = 0; i < batchSize; i++) {
+                    AtomicCacheUpdateClosure c = locked.get(i).data();
+
+                    dhtAtomicUpdateRequestUpdateSingleEntry(
+                        c.entry(),
+                        c,
+                        nodeId,
+                        req,
+                        nearRes,
+                        taskName,
+                        writeThrough,
+                        intercept,
+                        batchStart + i);
                 }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    // Ignore.
+            }
+            finally {
+                for (int i = 0; i < batchSize; i++)
+                    locked.get(i).data().entry().unlockEntry();
+
+                for (int i = 0; i < batchSize; i++) {
+                    AtomicCacheUpdateClosure c = locked.get(i).data();
+
+                    if (c.updateResult().removeVersion() != null)
+                        ctx.onDeferredDelete(c.entry(), 
c.updateResult().removeVersion());
+
+                    GridCacheMapEntry entry = locked.get(i).data().entry();
+
+                    entry.onUnlock();
+
+                    entry.touch();
                 }
-                catch (IgniteCheckedException|RuntimeException e) {
-                    if(e instanceof RuntimeException && !X.hasCause(e, 
IgniteOutOfMemoryException.class))
-                        throw (RuntimeException)e;
+            }
+        }
+        catch (GridCacheEntryRemovedException e) {
+            assert false;
+        }
+        catch (NodeStoppingException e) {
+            throw e;
+        }
+        catch (IgniteCheckedException | RuntimeException e) {
+            if (e instanceof RuntimeException && !X.hasCause(e, 
IgniteOutOfMemoryException.class))
+                throw (RuntimeException)e;
+
+            for (int i = 0; i < batchSize; i++) {
+                KeyCacheObject key = req.key(batchStart + i);
+
+                IgniteCheckedException err = new 
IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
+                if (nearRes != null)
+                    nearRes.addFailedKey(key, err);
+            }
+
+            U.error(log, "Failed to update keys on backup node", e);
+        }
+    }
+
+    private GridCacheUpdateAtomicResult 
dhtAtomicUpdateRequestUpdateSingleEntry(
+        GridCacheMapEntry entry,
+        AtomicCacheUpdateClosure c,
+        UUID nodeId,
+        GridDhtAtomicAbstractUpdateRequest req,
+        @Nullable GridDhtAtomicNearResponse nearRes,
+        String taskName,
+        boolean writeThrough,
+        boolean intercept,
+        int idx
+    ) throws NodeStoppingException, GridCacheEntryRemovedException {
+        CacheObject val = req.value(idx);
+        CacheObject prevVal = req.previousValue(idx);
+
+        EntryProcessor<Object, Object, Object> entryProcessor = 
req.entryProcessor(idx);
+        Long updateIdx = req.updateCounter(idx);
+
+        GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != 
null) ? UPDATE : DELETE;
+
+        long ttl = req.ttl(idx);
+        long expireTime = req.conflictExpireTime(idx);
+
+        try {
+            return entry.innerUpdate(
+                c,
+                req.writeVersion(),
+                nodeId,
+                nodeId,
+                op,
+                op == TRANSFORM ? entryProcessor : val,
+                op == TRANSFORM ? req.invokeArguments() : null,
+                writeThrough,
+                /*read-through*/false,
+                /*retval*/false,
+                req.keepBinary(),
+                /*expiry policy*/null,
+                /*event*/true,
+                /*metrics*/true,
+                /*primary*/false,
+                /*check version*/!req.forceTransformBackups(),
+                req.topologyVersion(),
+                CU.empty0(),
+                ctx.isDrEnabled() ? DR_BACKUP : DR_NONE,
+                ttl,
+                expireTime,
+                req.conflictVersion(idx),
+                false,
+                intercept,
+                req.subjectId(),
+                taskName,
+                prevVal,
+                updateIdx,
+                null,
+                req.transformOperation());
+        }
+        catch (NodeStoppingException e) {
+            throw e;
+        }
+        catch (IgniteCheckedException | RuntimeException e) {
+            if(e instanceof RuntimeException && !X.hasCause(e, 
IgniteOutOfMemoryException.class))
+                throw (RuntimeException)e;
+
+            KeyCacheObject key = entry.key();
+
+            IgniteCheckedException err = new IgniteCheckedException("Failed to 
update key on backup node: " + key, e);
+
+            if (nearRes != null)
+                nearRes.addFailedKey(key, err);
+
+            U.error(log, "Failed to update key on backup node: " + key, e);
+
+            return null;
+        }
+    }
+
+    private void dhtAtomicUpdateRequestUpdateSingle(
+        UUID nodeId,
+        GridDhtAtomicAbstractUpdateRequest req,
+        @Nullable GridDhtAtomicNearResponse nearRes,
+        String taskName,
+        boolean writeThrough,
+        boolean intercept,
+        int i) throws NodeStoppingException
+    {
+        KeyCacheObject key = req.key(i);
+
+        try {
+            while (true) {
+                GridDhtCacheEntry entry = null;
 
-                    IgniteCheckedException err = new 
IgniteCheckedException("Failed to update key on backup node: " + key, e);
+                try {
+                    entry = entryExx(key);
+
+                    GridCacheUpdateAtomicResult updRes = 
dhtAtomicUpdateRequestUpdateSingleEntry(
+                        entry,
+                        null,
+                        nodeId,
+                        req,
+                        nearRes,
+                        taskName,
+                        writeThrough,
+                        intercept,
+                        i);
 
-                    if (nearRes != null)
-                        nearRes.addFailedKey(key, err);
+                    if (updRes != null && updRes.removeVersion() != null)
+                        ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                    U.error(log, "Failed to update key on backup node: " + 
key, e);
+                    entry.onUnlock();
+
+                    break; // While.
                 }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry while updating backup 
value (will retry): " + key);
+
+                    entry = null;
+                }
+                finally {
+                    if (entry != null)
+                        entry.touch();
+                }
+            }
+        }
+        catch (GridDhtInvalidPartitionException ignored) {
+            // Ignore.
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param req Dht atomic update request.
+     */
+    private void processDhtAtomicUpdateRequest(UUID nodeId, 
GridDhtAtomicAbstractUpdateRequest req) {
+        assert Thread.currentThread().getName().startsWith("sys-stripe-") : 
Thread.currentThread().getName();
+
+        if (msgLog.isDebugEnabled()) {
+            msgLog.debug("Received DHT atomic update request [futId=" + 
req.futureId() +
+                ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
+        }
+
+        assert req.partition() >= 0 : req;
+
+        GridDhtAtomicNearResponse nearRes = null;
+
+        if (req.nearNodeId() != null) {
+            nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+                req.partition(),
+                req.nearFutureId(),
+                nodeId,
+                req.flags());
+        }
+
+        String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+
+        boolean writeThrough = (ctx.store().isLocal() && 
!ctx.shared().localStorePrimaryOnly())
+            && writeThrough() && !req.skipStore();
+
+        boolean intercept = req.forceTransformBackups() && 
ctx.config().getInterceptor() != null;
+
+        ctx.shared().database().checkpointReadLock();
+
+        try {
+            if (req.size() > 1) {
+                dhtAtomicUpdateRequestUpdateBatch(
+                    nodeId,
+                    req,
+                    nearRes,
+                    taskName,
+                    writeThrough,
+                    intercept);
             }
+            else {
+                for (int i = 0; i < req.size(); i++)
+                    dhtAtomicUpdateRequestUpdateSingle(nodeId, req, nearRes, 
taskName, writeThrough, intercept, i);
+            }
+        }
+        catch (NodeStoppingException e){
+            U.warn(log, "Failed to update key on backup (local node is 
stopping)");
+
+            return;
         }
         finally {
             ctx.shared().database().checkpointReadUnlock();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index a15ff0d..57d3fd2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -2103,10 +2104,10 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public CacheSearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key) throws IgniteCheckedException {
+        @Override public CacheSearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key, Object data) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
-            return delegate.createSearchRow(cctx, key);
+            return delegate.createSearchRow(cctx, key, data);
         }
 
         /** {@inheritDoc} */
@@ -2119,12 +2120,12 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         /** {@inheritDoc} */
         @Override public void invokeAll(GridCacheContext cctx,
             Collection<? extends CacheSearchRow> rows,
-            Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map) 
throws IgniteCheckedException {
+            Function<CacheSearchRow, OffheapInvokeClosure> closures) throws 
IgniteCheckedException {
             assert ctx.database().checkpointLockIsHeldByThread();
 
             CacheDataStore delegate = init0(false);
 
-            delegate.invokeAll(cctx, rows, map);
+            delegate.invokeAll(cctx, rows, closures);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 63a3d2c..d794840 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -1844,7 +1844,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
      * @param closures Provider of closures for each search row.
      * @throws IgniteCheckedException If failed.
      */
-    public void invokeAll(Iterator<? extends L> sortedRows, Object z, 
Function<L, InvokeClosure<T>> closures)
+    public void invokeAll(Iterator<? extends L> sortedRows, Object z, 
Function<L, ? extends InvokeClosure<T>> closures)
         throws IgniteCheckedException {
         doInvoke(new InvokeAll(sortedRows.next(), sortedRows, z, closures));
     }
@@ -4035,7 +4035,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         Iterator<? extends L> sortedRows;
 
         /** */
-        Function<L, InvokeClosure<T>> closures;
+        Function<L, ? extends InvokeClosure<T>> closures;
 
         /** */
         ReuseBag reuseBag;
@@ -4046,7 +4046,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param x Implementation specific argument.
          * @param clo Closure.
          */
-        InvokeAll(L firstRow, Iterator<? extends L> sortedRows, Object x, 
Function<L, InvokeClosure<T>> closures) {
+        InvokeAll(L firstRow, Iterator<? extends L> sortedRows, Object x, 
Function<L, ? extends InvokeClosure<T>> closures) {
             super(firstRow, x, closures.apply(firstRow));
 
             this.sortedRows = sortedRows;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRowEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRowEx.java
new file mode 100644
index 0000000..469e7ba
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRowEx.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+
+/**
+ *
+ */
+public class SearchRowEx<T> extends SearchRow {
+    /** */
+    private final T data;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     */
+    public SearchRowEx(int cacheId, KeyCacheObject key, T data) {
+        super(cacheId, key);
+
+        this.data = data;
+    }
+
+    /**
+     * @return Data.
+     */
+    public T data() {
+        return data;
+    }
+}

Reply via email to