This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll-backup in repository https://gitbox.apache.org/repos/asf/ignite.git
commit f37831b307cfc890bb5dc374c0023ccac7be76af Author: sboikov <sboi...@apache.org> AuthorDate: Sun Feb 24 14:09:23 2019 +0300 invokeAll --- .../processors/cache/GridCacheMapEntry.java | 11 +- .../cache/IgniteCacheOffheapManager.java | 7 +- .../cache/IgniteCacheOffheapManagerImpl.java | 27 +- .../distributed/dht/atomic/GridDhtAtomicCache.java | 762 ++++++++++++++------- .../cache/persistence/GridCacheOffheapManager.java | 9 +- .../cache/persistence/tree/BPlusTree.java | 6 +- .../processors/cache/tree/SearchRowEx.java | 45 ++ 7 files changed, 599 insertions(+), 268 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 23b14b6..acd4349 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2239,7 +2239,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic() && !detached(); - if (!primary && !isNear()) + if (!primary && !isNear() && c == null) ensureFreeSpace(); if (!primary) { @@ -5903,7 +5903,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private boolean wasIntercepted; /** */ - AtomicCacheUpdateClosure( + public AtomicCacheUpdateClosure( GridCacheMapEntry entry, AffinityTopologyVersion topVer, GridCacheVersion newVer, @@ -5960,6 +5960,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } + /** + * @return Update result. + */ + public GridCacheUpdateAtomicResult updateResult() { + return updateRes; + } + /** {@inheritDoc} */ @Nullable @Override public CacheDataRow oldRow() { return oldRow; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 870d99f..9455c2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.function.Function; import javax.cache.Cache; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -200,7 +201,7 @@ public interface IgniteCacheOffheapManager { public void invokeAll(GridCacheContext cctx, GridDhtLocalPartition part, Collection<? extends CacheSearchRow> rows, - Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map) + Function<CacheSearchRow, OffheapInvokeClosure> closures) throws IgniteCheckedException; /** @@ -899,7 +900,7 @@ public interface IgniteCacheOffheapManager { * @return Cache search row. * @throws IgniteCheckedException If failed. */ - public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; + public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException; /** * @return Rows comparator. @@ -915,7 +916,7 @@ public interface IgniteCacheOffheapManager { */ public void invokeAll(GridCacheContext cctx, Collection<? extends CacheSearchRow> rows, - Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map) + Function<CacheSearchRow, OffheapInvokeClosure> closures) throws IgniteCheckedException; /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 3ae5b81..0a621dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import javax.cache.Cache; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -80,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.tree.PendingRow; import org.apache.ignite.internal.processors.cache.tree.RowLinkIO; import org.apache.ignite.internal.processors.cache.tree.SearchRow; +import org.apache.ignite.internal.processors.cache.tree.SearchRowEx; import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow; import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRow; import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult; @@ -456,9 +458,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheContext cctx, GridDhtLocalPartition part, Collection<? extends CacheSearchRow> rows, - Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map) + Function<CacheSearchRow, OffheapInvokeClosure> closures) throws IgniteCheckedException { - dataStore(part).invokeAll(cctx, rows, map); + dataStore(part).invokeAll(cctx, rows, closures); } /** {@inheritDoc} */ @@ -1650,8 +1652,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public SearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key) { - return new SearchRow(grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID, key); + @Override public SearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) { + int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + return data != null ? new SearchRowEx<>(cacheId, key, data) : new SearchRow(cacheId, key); } /** {@inheritDoc} */ @@ -1660,7 +1664,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void invokeAll(GridCacheContext cctx, Collection<? extends CacheSearchRow> rows, Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map) throws IgniteCheckedException { + @Override public void invokeAll(GridCacheContext cctx, + Collection<? extends CacheSearchRow> rows, + Function<CacheSearchRow, OffheapInvokeClosure> closures) + throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); @@ -1668,16 +1675,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager RuntimeException err = null; try { - dataTree.invokeAll(rows.iterator(), CacheDataRowAdapter.RowData.NO_KEY, map::get); + dataTree.invokeAll(rows.iterator(), CacheDataRowAdapter.RowData.NO_KEY, closures); } catch (UnregisteredClassException | UnregisteredBinaryTypeException clsErr) { err = clsErr; } - for (Map.Entry<? extends CacheSearchRow, ? extends OffheapInvokeClosure> e : map.entrySet()) { + for (CacheSearchRow row : rows) { + OffheapInvokeClosure c = closures.apply(row); + // Update could be interrupted in the middle, finish update only for processed entries. - if (e.getValue().operationType() != null) - finishInvoke(cctx, e.getKey().key(), e.getValue()); + if (c.operationType() != null) + finishInvoke(cctx, row.key(), c); } if (err != null) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 329165a..cf23e25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry.AtomicCacheBatchUpdateClosure; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry.AtomicCacheUpdateClosure; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; @@ -95,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.tree.SearchRowEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; @@ -1760,6 +1762,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { DhtAtomicUpdateResult updDhtRes = new DhtAtomicUpdateResult(); try { + Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart = null; + + if (req.size() > 1 && // Several keys ... + writeThrough() && !req.skipStore() && // and store is enabled ... + !ctx.store().isLocal() && // and this is not local store ... + // (conflict resolver should be used for local store) + !ctx.dr().receiveEnabled() // and no DR. + ) { + byPart = new HashMap<>(); + } + while (true) { try { GridDhtPartitionTopology top = topology(); @@ -1855,7 +1868,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - update(node, locked, req, res, updDhtRes); + update(node, locked, req, res, updDhtRes, byPart); dhtFut = updDhtRes.dhtFuture(); deleted = updDhtRes.deleted(); @@ -1964,15 +1977,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Request. * @param res Response. * @param dhtUpdRes DHT update result - * @return Operation result. * @throws GridCacheEntryRemovedException If got obsolete entry. */ - private DhtAtomicUpdateResult update( + private void update( ClusterNode node, List<GridDhtCacheEntry> locked, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, - DhtAtomicUpdateResult dhtUpdRes) + DhtAtomicUpdateResult dhtUpdRes, + @Nullable Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart + ) throws GridCacheEntryRemovedException { GridDhtPartitionTopology top = topology(); @@ -2006,12 +2020,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.group().listenerLock().readLock().lock(); try { - if (req.size() > 1 && // Several keys ... - writeThrough() && !req.skipStore() && // and store is enabled ... - !ctx.store().isLocal() && // and this is not local store ... - // (conflict resolver should be used for local store) - !ctx.dr().receiveEnabled() // and no DR. - ) { + if (byPart != null) { // This method can only be used when there are no replicated entries in the batch. updateWithBatch(node, hasNear, @@ -2023,7 +2032,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, expiry, sndPrevVal, - dhtUpdRes); + dhtUpdRes, + byPart); if (req.operation() == TRANSFORM) retVal = dhtUpdRes.returnValue(); @@ -2077,8 +2087,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } dhtUpdRes.expiryPolicy(expiry); - - return dhtUpdRes; } /** @@ -2109,7 +2117,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, final boolean sndPrevVal, - final DhtAtomicUpdateResult dhtUpdRes + final DhtAtomicUpdateResult dhtUpdRes, + final Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart ) throws GridCacheEntryRemovedException { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. @@ -2270,22 +2279,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Update previous batch. if (putMap != null) { updatePartialBatch( - hasNear, firstEntryIdx, filtered, ver, - node, writeVals, putMap, null, - entryProcessorMap, req, res, - replicate, dhtUpdRes, - taskName, expiry, - sndPrevVal); + byPart); firstEntryIdx = i; @@ -2317,22 +2321,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Update previous batch. if (rmvKeys != null) { updatePartialBatch( - hasNear, firstEntryIdx, filtered, ver, - node, null, null, rmvKeys, - entryProcessorMap, req, res, - replicate, dhtUpdRes, - taskName, expiry, - sndPrevVal); + byPart); firstEntryIdx = i; @@ -2440,27 +2439,65 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Store final batch. if (putMap != null || rmvKeys != null) { updatePartialBatch( - hasNear, firstEntryIdx, filtered, ver, - node, writeVals, putMap, rmvKeys, - entryProcessorMap, req, res, - replicate, dhtUpdRes, - taskName, expiry, - sndPrevVal); + byPart); } else assert filtered.isEmpty(); dhtUpdRes.returnValue(invokeRes); + + AffinityAssignment affAssignment = ctx.affinity().assignment(req.topologyVersion()); + + GridDrType drType = replicate ? DR_PRIMARY : DR_NONE; + + for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> e0 : byPart.entrySet()) { + try { + Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = e0.getValue(); + + ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map::get); + + for (Map.Entry<CacheSearchRow, AtomicCacheBatchUpdateClosure> e : map.entrySet()) { + AtomicCacheBatchUpdateClosure c = e.getValue(); + + updateSingleEntryPartialBatch( + (GridDhtCacheEntry)c.entry(), + c, + ver, + (CacheObject)c.writeValue(), + entryProcessorMap, + node, + hasNear, + taskName, + expiry, + drType, + req, + res, + dhtUpdRes, + c.reqIdx, + affAssignment, + sndPrevVal); + } + } + catch (GridCacheEntryRemovedException e) { + assert false : "Entry cannot become obsolete while holding lock."; + + e.printStackTrace(); + } + catch (IgniteCheckedException e) { + for (CacheSearchRow row : e0.getValue().keySet()) + res.addFailedKey(row.key(), e); + } + } } /** @@ -2614,7 +2651,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.disableTriggeringCacheInterceptorOnConflict() ); - map.put(dataStore.createSearchRow(ctx, entry.key()), c); + map.put(dataStore.createSearchRow(ctx, entry.key(), null), c); } catch (IgniteCheckedException e) { res.addFailedKey(entry.key(), e); @@ -2628,7 +2665,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = e0.getValue(); try { - ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map); + ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map::get); } catch (UnregisteredClassException | UnregisteredBinaryTypeException e) { err = e; @@ -2915,22 +2952,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param sndPrevVal If {@code true} sends previous value to backups. */ private void updatePartialBatch( - final boolean hasNear, final int firstEntryIdx, final List<GridDhtCacheEntry> entries, final GridCacheVersion ver, - final ClusterNode nearNode, @Nullable final List<CacheObject> writeVals, @Nullable final Map<KeyCacheObject, CacheObject> putMap, @Nullable final Collection<KeyCacheObject> rmvKeys, - @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, - final boolean replicate, final DhtAtomicUpdateResult dhtUpdRes, - final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, - final boolean sndPrevVal + Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart ) { assert putMap == null ^ rmvKeys == null; @@ -2971,17 +3003,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op = DELETE; } - AffinityAssignment affAssignment = ctx.affinity().assignment(topVer); - - GridDrType drType = replicate ? DR_PRIMARY : DR_NONE; - Collection<Object> failedToUnwrapKeys = null; int cnt = entries.size(); - Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart = - cnt > 1 ? new HashMap<>() : null; - // Avoid iterator creation. for (int i = 0; i < cnt; i++) { GridDhtCacheEntry entry = entries.get(i); @@ -3015,119 +3040,49 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { continue; } - try { - // We are holding java-level locks on entries at this point. - CacheObject writeVal = op == UPDATE ? writeVals.get(i) : null; + // We are holding java-level locks on entries at this point. + CacheObject writeVal = op == UPDATE ? writeVals.get(i) : null; - assert writeVal != null || op == DELETE : "null write value found."; + assert writeVal != null || op == DELETE : "null write value found."; - if (byPart != null) { - GridDhtLocalPartition part = entry.localPartition(); + GridDhtLocalPartition part = entry.localPartition(); - TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = byPart.get(part); + TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = byPart.get(part); - IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part); + IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part); - if (map == null) - byPart.put(part, map = new TreeMap<>(dataStore.rowsComparator())); + if (map == null) + byPart.put(part, map = new TreeMap<>(dataStore.rowsComparator())); - AtomicCacheBatchUpdateClosure c = new AtomicCacheBatchUpdateClosure( - firstEntryIdx + i, - entry, - topVer, - ver, - op, - writeVal, - req.invokeArguments(), - /*read-through*/false, - /*write-through*/false, - req.keepBinary(), - expiry, - /*primary*/true, - /*verCheck*/false, - req.filter(), - CU.TTL_NOT_CHANGED, - CU.EXPIRE_TIME_CALCULATE, - null, - /*conflictResolve*/false, - false, - null, - ctx.disableTriggeringCacheInterceptorOnConflict() - ); - - map.put(dataStore.createSearchRow(ctx, entry.key()), c); - } - else { - updateSingleEntryPartialBatch(entry, - null, - op, - ver, - writeVal, - entryProcessorMap, - nearNode, - hasNear, - taskName, - expiry, - drType, - req, - res, - dhtUpdRes, - i + firstEntryIdx, - affAssignment, - sndPrevVal); - } - } - catch (GridCacheEntryRemovedException e) { - assert false : "Entry cannot become obsolete while holding lock."; + AtomicCacheBatchUpdateClosure c = new AtomicCacheBatchUpdateClosure( + firstEntryIdx + i, + entry, + topVer, + ver, + op, + writeVal, + req.invokeArguments(), + /*read-through*/false, + /*write-through*/false, + req.keepBinary(), + expiry, + /*primary*/true, + /*verCheck*/false, + req.filter(), + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + null, + /*conflictResolve*/false, + false, + null, + ctx.disableTriggeringCacheInterceptorOnConflict() + ); - e.printStackTrace(); - } + map.put(dataStore.createSearchRow(ctx, entry.key(), null), c); dhtUpdRes.processedEntriesCount(firstEntryIdx + i + 1); } - if (byPart != null) { - for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> e0 : byPart.entrySet()) { - try { - Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = e0.getValue(); - - ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map); - - for (Map.Entry<CacheSearchRow, AtomicCacheBatchUpdateClosure> e : map.entrySet()) { - AtomicCacheBatchUpdateClosure c = e.getValue(); - - updateSingleEntryPartialBatch( - (GridDhtCacheEntry)c.entry(), - c, - op, - ver, - (CacheObject)c.writeValue(), - entryProcessorMap, - nearNode, - hasNear, - taskName, - expiry, - drType, - req, - res, - dhtUpdRes, - c.reqIdx, - affAssignment, - sndPrevVal); - } - } - catch (GridCacheEntryRemovedException e) { - assert false : "Entry cannot become obsolete while holding lock."; - - e.printStackTrace(); - } - catch (IgniteCheckedException e) { - for (CacheSearchRow row : e0.getValue().keySet()) - res.addFailedKey(row.key(), e); - } - } - } - if (failedToUnwrapKeys != null) { log.warning("Failed to get values of keys: " + failedToUnwrapKeys + " (the binary objects will be used instead)."); @@ -3151,7 +3106,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private void updateSingleEntryPartialBatch( GridDhtCacheEntry entry, @Nullable AtomicCacheBatchUpdateClosure c, - GridCacheOperation op, GridCacheVersion ver, @Nullable final CacheObject writeVal, @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, @@ -3177,7 +3131,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Get readers before innerUpdate (reader cleared after remove). GridDhtCacheEntry.ReaderId[] readers = c != null ? c.rdrs : entry.readersLocked(); - assert writeVal != null || op == DELETE : "null write value found."; + GridCacheOperation op = writeVal == null ? DELETE : UPDATE; GridCacheUpdateAtomicResult updRes = entry.innerUpdate( c, @@ -3583,135 +3537,449 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { sendNearUpdateReply(nodeId, res); } - /** - * @param nodeId Sender node ID. - * @param req Dht atomic update request. - */ - private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) { - assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName(); + private void dhtAtomicUpdateRequestUpdateBatch( + UUID nodeId, + GridDhtAtomicAbstractUpdateRequest req, + @Nullable GridDhtAtomicNearResponse nearRes, + String taskName, + boolean writeThrough, + boolean intercept + ) throws NodeStoppingException { + int curPart = -1; - if (msgLog.isDebugEnabled()) { - msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() + - ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); - } + int batchStart = 0; + int batchSize = 0; - assert req.partition() >= 0 : req; + for (int i = 0; i < req.size(); i++) { + KeyCacheObject key = req.key(i); - GridCacheVersion ver = req.writeVersion(); + int part = key.partition(); - GridDhtAtomicNearResponse nearRes = null; + assert part >= 0 : key; - if (req.nearNodeId() != null) { - nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), - req.partition(), - req.nearFutureId(), + if (curPart >=0 && curPart != part) { + dhtAtomicUpdateRequestUpdateBatch( + batchStart, + batchSize, + nodeId, + req, + nearRes, + taskName, + writeThrough, + intercept); + + batchStart = i; + batchSize = 1; + } + else + batchSize++; + + curPart = part; + } + + if (batchSize > 0) { + dhtAtomicUpdateRequestUpdateBatch( + batchStart, + batchSize, nodeId, - req.flags()); + req, + nearRes, + taskName, + writeThrough, + intercept); } + } - boolean replicate = ctx.isDrEnabled(); + @Nullable private List<SearchRowEx<AtomicCacheUpdateClosure>> dhtAtomicUpdateLockBatch( + final int batchStart, + final int batchSize, + final GridDhtAtomicAbstractUpdateRequest req, + boolean writeThrough, + boolean intercept + ) throws IgniteCheckedException { + try { + List<SearchRowEx<AtomicCacheUpdateClosure>> locked = new ArrayList<>(batchSize); - boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; + SearchRowEx<AtomicCacheUpdateClosure> prev = null; - String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + GridDhtLocalPartition part = null; - ctx.shared().database().checkpointReadLock(); + while (true) { + for (int i = 0; i < batchSize; i++) { + int idx = batchStart + i; - try { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); + KeyCacheObject key = req.key(idx); - try { - while (true) { - GridDhtCacheEntry entry = null; + GridDhtCacheEntry entry = entryExx(key); - try { - entry = entryExx(key); + if (part == null) + part = entry.localPartition(); + else + assert part == entry.localPartition(); - CacheObject val = req.value(i); - CacheObject prevVal = req.previousValue(i); + CacheObject val = req.value(idx); - EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); - Long updateIdx = req.updateCounter(i); + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); + Long updateIdx = req.updateCounter(idx); - GridCacheOperation op = entryProcessor != null ? TRANSFORM : - (val != null) ? UPDATE : DELETE; + GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; - long ttl = req.ttl(i); - long expireTime = req.conflictExpireTime(i); + long ttl = req.ttl(idx); + long expireTime = req.conflictExpireTime(idx); - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( - null, - ver, - nodeId, - nodeId, - op, - op == TRANSFORM ? entryProcessor : val, - op == TRANSFORM ? req.invokeArguments() : null, - /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()) - && writeThrough() && !req.skipStore(), - /*read-through*/false, - /*retval*/false, - req.keepBinary(), - /*expiry policy*/null, - /*event*/true, - /*metrics*/true, - /*primary*/false, - /*check version*/!req.forceTransformBackups(), - req.topologyVersion(), - CU.empty0(), - replicate ? DR_BACKUP : DR_NONE, - ttl, - expireTime, - req.conflictVersion(i), - false, - intercept, - req.subjectId(), - taskName, - prevVal, - updateIdx, - null, - req.transformOperation()); + AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure( + entry, + req.topologyVersion(), + req.writeVersion(), + op, + op == TRANSFORM ? entryProcessor : val, + op == TRANSFORM ? req.invokeArguments() : null, + /*read-through*/false, + writeThrough, + req.keepBinary(), + /*expiry policy*/null, + /*primary*/false, + /*check version*/!req.forceTransformBackups(), + CU.empty0(), + ttl, + expireTime, + req.conflictVersion(i), + false, + intercept, + updateIdx, + ctx.disableTriggeringCacheInterceptorOnConflict()); - if (updRes.removeVersion() != null) - ctx.onDeferredDelete(entry, updRes.removeVersion()); + SearchRowEx<AtomicCacheUpdateClosure> row = (SearchRowEx<AtomicCacheUpdateClosure>) + part.dataStore().createSearchRow(ctx, key, c); - entry.onUnlock(); + locked.add(row); - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry while updating backup value (will retry): " + key); + // Expect keys in request are already sorted. + assert prev == null || part.dataStore().rowsComparator().compare(row, prev) >= 0; - entry = null; - } - finally { - if (entry != null) - entry.touch(); - } + prev = row; + } + + boolean retry = false; + + for (int i = 0; i < locked.size(); i++) { + SearchRowEx<AtomicCacheUpdateClosure> row = locked.get(i); + + GridCacheMapEntry entry = row.data().entry(); + + entry.lockEntry(); + + if (entry.obsolete()) { + // Unlock all locked. + for (int j = 0; j <= i; j++) + locked.get(j).data().entry().unlockEntry(); + + // Clear entries. + locked.clear(); + + // Retry. + retry = true; + + break; } } - catch (NodeStoppingException e){ - U.warn(log, "Failed to update key on backup (local node is stopping): " + key); - return; + if (!retry) + return locked; + } + } + catch (GridDhtInvalidPartitionException e) { + // Ignore, do not need update backup. + return null; + } + } + + private static AtomicCacheUpdateClosure rowClosure(CacheSearchRow row) { + return ((SearchRowEx<AtomicCacheUpdateClosure>)row).data(); + } + + private void dhtAtomicUpdateRequestUpdateBatch( + final int batchStart, + final int batchSize, + final UUID nodeId, + final GridDhtAtomicAbstractUpdateRequest req, + final @Nullable GridDhtAtomicNearResponse nearRes, + final String taskName, + final boolean writeThrough, + final boolean intercept + ) throws NodeStoppingException { + assert batchSize > 0 : batchSize; + assert batchStart >= 0 && batchStart < req.size() : batchStart; + + if (batchSize == 1) { + dhtAtomicUpdateRequestUpdateSingle( + nodeId, + req, + nearRes, + taskName, + writeThrough, + intercept, + batchStart); + + return; + } + + try { + ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); + + List<SearchRowEx<AtomicCacheUpdateClosure>> locked = dhtAtomicUpdateLockBatch( + batchStart, + batchSize, + req, + writeThrough, + intercept); + + if (locked == null) + return; + + assert !locked.isEmpty(); + + IgniteCacheOffheapManager.CacheDataStore dataStore = ((GridDhtCacheEntry)locked.get(0).data().entry()).localPartition().dataStore(); + + try { + dataStore.invokeAll(ctx, locked, GridDhtAtomicCache::rowClosure); + + for (int i = 0; i < batchSize; i++) { + AtomicCacheUpdateClosure c = locked.get(i).data(); + + dhtAtomicUpdateRequestUpdateSingleEntry( + c.entry(), + c, + nodeId, + req, + nearRes, + taskName, + writeThrough, + intercept, + batchStart + i); } - catch (GridDhtInvalidPartitionException ignored) { - // Ignore. + } + finally { + for (int i = 0; i < batchSize; i++) + locked.get(i).data().entry().unlockEntry(); + + for (int i = 0; i < batchSize; i++) { + AtomicCacheUpdateClosure c = locked.get(i).data(); + + if (c.updateResult().removeVersion() != null) + ctx.onDeferredDelete(c.entry(), c.updateResult().removeVersion()); + + GridCacheMapEntry entry = locked.get(i).data().entry(); + + entry.onUnlock(); + + entry.touch(); } - catch (IgniteCheckedException|RuntimeException e) { - if(e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class)) - throw (RuntimeException)e; + } + } + catch (GridCacheEntryRemovedException e) { + assert false; + } + catch (NodeStoppingException e) { + throw e; + } + catch (IgniteCheckedException | RuntimeException e) { + if (e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class)) + throw (RuntimeException)e; + + for (int i = 0; i < batchSize; i++) { + KeyCacheObject key = req.key(batchStart + i); + + IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e); + + if (nearRes != null) + nearRes.addFailedKey(key, err); + } + + U.error(log, "Failed to update keys on backup node", e); + } + } + + private GridCacheUpdateAtomicResult dhtAtomicUpdateRequestUpdateSingleEntry( + GridCacheMapEntry entry, + AtomicCacheUpdateClosure c, + UUID nodeId, + GridDhtAtomicAbstractUpdateRequest req, + @Nullable GridDhtAtomicNearResponse nearRes, + String taskName, + boolean writeThrough, + boolean intercept, + int idx + ) throws NodeStoppingException, GridCacheEntryRemovedException { + CacheObject val = req.value(idx); + CacheObject prevVal = req.previousValue(idx); + + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(idx); + Long updateIdx = req.updateCounter(idx); + + GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; + + long ttl = req.ttl(idx); + long expireTime = req.conflictExpireTime(idx); + + try { + return entry.innerUpdate( + c, + req.writeVersion(), + nodeId, + nodeId, + op, + op == TRANSFORM ? entryProcessor : val, + op == TRANSFORM ? req.invokeArguments() : null, + writeThrough, + /*read-through*/false, + /*retval*/false, + req.keepBinary(), + /*expiry policy*/null, + /*event*/true, + /*metrics*/true, + /*primary*/false, + /*check version*/!req.forceTransformBackups(), + req.topologyVersion(), + CU.empty0(), + ctx.isDrEnabled() ? DR_BACKUP : DR_NONE, + ttl, + expireTime, + req.conflictVersion(idx), + false, + intercept, + req.subjectId(), + taskName, + prevVal, + updateIdx, + null, + req.transformOperation()); + } + catch (NodeStoppingException e) { + throw e; + } + catch (IgniteCheckedException | RuntimeException e) { + if(e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class)) + throw (RuntimeException)e; + + KeyCacheObject key = entry.key(); + + IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e); + + if (nearRes != null) + nearRes.addFailedKey(key, err); + + U.error(log, "Failed to update key on backup node: " + key, e); + + return null; + } + } + + private void dhtAtomicUpdateRequestUpdateSingle( + UUID nodeId, + GridDhtAtomicAbstractUpdateRequest req, + @Nullable GridDhtAtomicNearResponse nearRes, + String taskName, + boolean writeThrough, + boolean intercept, + int i) throws NodeStoppingException + { + KeyCacheObject key = req.key(i); + + try { + while (true) { + GridDhtCacheEntry entry = null; - IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e); + try { + entry = entryExx(key); + + GridCacheUpdateAtomicResult updRes = dhtAtomicUpdateRequestUpdateSingleEntry( + entry, + null, + nodeId, + req, + nearRes, + taskName, + writeThrough, + intercept, + i); - if (nearRes != null) - nearRes.addFailedKey(key, err); + if (updRes != null && updRes.removeVersion() != null) + ctx.onDeferredDelete(entry, updRes.removeVersion()); - U.error(log, "Failed to update key on backup node: " + key, e); + entry.onUnlock(); + + break; // While. } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while updating backup value (will retry): " + key); + + entry = null; + } + finally { + if (entry != null) + entry.touch(); + } + } + } + catch (GridDhtInvalidPartitionException ignored) { + // Ignore. + } + } + + /** + * @param nodeId Sender node ID. + * @param req Dht atomic update request. + */ + private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) { + assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName(); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() + + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); + } + + assert req.partition() >= 0 : req; + + GridDhtAtomicNearResponse nearRes = null; + + if (req.nearNodeId() != null) { + nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), + req.partition(), + req.nearFutureId(), + nodeId, + req.flags()); + } + + String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + + boolean writeThrough = (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()) + && writeThrough() && !req.skipStore(); + + boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; + + ctx.shared().database().checkpointReadLock(); + + try { + if (req.size() > 1) { + dhtAtomicUpdateRequestUpdateBatch( + nodeId, + req, + nearRes, + taskName, + writeThrough, + intercept); } + else { + for (int i = 0; i < req.size(); i++) + dhtAtomicUpdateRequestUpdateSingle(nodeId, req, nearRes, taskName, writeThrough, intercept, i); + } + } + catch (NodeStoppingException e){ + U.warn(log, "Failed to update key on backup (local node is stopping)"); + + return; } finally { ctx.shared().database().checkpointReadUnlock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index a15ff0d..57d3fd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -2103,10 +2104,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { + @Override public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - return delegate.createSearchRow(cctx, key); + return delegate.createSearchRow(cctx, key, data); } /** {@inheritDoc} */ @@ -2119,12 +2120,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void invokeAll(GridCacheContext cctx, Collection<? extends CacheSearchRow> rows, - Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map) throws IgniteCheckedException { + Function<CacheSearchRow, OffheapInvokeClosure> closures) throws IgniteCheckedException { assert ctx.database().checkpointLockIsHeldByThread(); CacheDataStore delegate = init0(false); - delegate.invokeAll(cctx, rows, map); + delegate.invokeAll(cctx, rows, closures); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index 63a3d2c..d794840 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -1844,7 +1844,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @param closures Provider of closures for each search row. * @throws IgniteCheckedException If failed. */ - public void invokeAll(Iterator<? extends L> sortedRows, Object z, Function<L, InvokeClosure<T>> closures) + public void invokeAll(Iterator<? extends L> sortedRows, Object z, Function<L, ? extends InvokeClosure<T>> closures) throws IgniteCheckedException { doInvoke(new InvokeAll(sortedRows.next(), sortedRows, z, closures)); } @@ -4035,7 +4035,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements Iterator<? extends L> sortedRows; /** */ - Function<L, InvokeClosure<T>> closures; + Function<L, ? extends InvokeClosure<T>> closures; /** */ ReuseBag reuseBag; @@ -4046,7 +4046,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @param x Implementation specific argument. * @param clo Closure. */ - InvokeAll(L firstRow, Iterator<? extends L> sortedRows, Object x, Function<L, InvokeClosure<T>> closures) { + InvokeAll(L firstRow, Iterator<? extends L> sortedRows, Object x, Function<L, ? extends InvokeClosure<T>> closures) { super(firstRow, x, closures.apply(firstRow)); this.sortedRows = sortedRows; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRowEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRowEx.java new file mode 100644 index 0000000..469e7ba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRowEx.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.internal.processors.cache.KeyCacheObject; + +/** + * + */ +public class SearchRowEx<T> extends SearchRow { + /** */ + private final T data; + + /** + * @param cacheId Cache ID. + * @param key Key. + */ + public SearchRowEx(int cacheId, KeyCacheObject key, T data) { + super(cacheId, key); + + this.data = data; + } + + /** + * @return Data. + */ + public T data() { + return data; + } +}