This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new fc3b142 invokeAll fc3b142 is described below commit fc3b14227acdbef8dbeee799a1d430b81b352a8c Author: sboikov <sboi...@apache.org> AuthorDate: Mon Feb 25 19:41:10 2019 +0300 invokeAll --- .../processors/cache/GridCacheEntryEx.java | 13 +- .../processors/cache/GridCacheMapEntry.java | 120 +------ .../distributed/dht/atomic/GridDhtAtomicCache.java | 389 ++++++++++----------- .../distributed/near/GridNearAtomicCache.java | 81 +++-- .../processors/cache/GridCacheTestEntryEx.java | 19 +- 5 files changed, 248 insertions(+), 374 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index b93e7f8..85ec6a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -551,28 +551,17 @@ public interface GridCacheEntryEx { */ public GridCacheUpdateAtomicResult innerUpdate( @Nullable GridCacheMapEntry.AtomicCacheUpdateClosure c, - GridCacheVersion ver, + boolean updateOffheap, UUID evtNodeId, UUID affNodeId, GridCacheOperation op, @Nullable Object val, - @Nullable Object[] invokeArgs, - boolean writeThrough, - boolean readThrough, boolean retval, - boolean keepBinary, - @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, - boolean checkVer, AffinityTopologyVersion topVer, - @Nullable CacheEntryPredicate[] filter, GridDrType drType, - long conflictTtl, - long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean conflictResolve, boolean intercept, @Nullable UUID subjId, String taskName, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index acd4349..c326924 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2207,28 +2207,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @SuppressWarnings("unchecked") @Override public GridCacheUpdateAtomicResult innerUpdate( AtomicCacheUpdateClosure c, - final GridCacheVersion newVer, + boolean updateStore, final UUID evtNodeId, final UUID affNodeId, final GridCacheOperation op, @Nullable final Object writeObj, - @Nullable final Object[] invokeArgs, - final boolean writeThrough, - final boolean readThrough, final boolean retval, - final boolean keepBinary, - @Nullable final IgniteCacheExpiryPolicy expiryPlc, final boolean evt, final boolean metrics, final boolean primary, - final boolean verCheck, final AffinityTopologyVersion topVer, - @Nullable final CacheEntryPredicate[] filter, final GridDrType drType, - final long explicitTtl, - final long explicitExpireTime, - @Nullable final GridCacheVersion conflictVer, - final boolean conflictResolve, final boolean intercept, @Nullable final UUID subjId, final String taskName, @@ -2255,35 +2244,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false); boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM - || !F.isEmptyOrNulls(filter); - - // Possibly read value from store. - boolean readFromStore = readThrough && needVal && (cctx.readThrough() && - (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue())); - - if (c == null) { - c = new AtomicCacheUpdateClosure(this, - topVer, - newVer, - op, - writeObj, - invokeArgs, - readFromStore, - writeThrough, - keepBinary, - expiryPlc, - primary, - verCheck, - filter, - explicitTtl, - explicitExpireTime, - conflictVer, - conflictResolve, - intercept, - updateCntr, - cctx.disableTriggeringCacheInterceptorOnConflict() - ); + || !F.isEmptyOrNulls(c.filter); + if (updateStore) { key.valueBytes(cctx.cacheObjectContext()); if (isNear()) { @@ -2328,12 +2291,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme (EntryProcessor<Object, Object, ?>)writeObj; CacheInvokeEntry<Object, Object> entry = - new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this); + new CacheInvokeEntry<>(key, prevVal, version(), c.keepBinary, this); IgniteThread.onEntryProcessorEntered(true); try { - entryProcessor.process(entry, invokeArgs); + entryProcessor.process(entry, c.invokeArgs); evtVal = entry.modified() ? cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; @@ -2401,7 +2364,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, transformClo.getClass().getName(), taskName, - keepBinary); + c.keepBinary); } if (c.op == UPDATE) { @@ -2431,7 +2394,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, null, taskName, - keepBinary); + c.keepBinary); } } else { @@ -2459,7 +2422,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, null, taskName, - keepBinary); + c.keepBinary); } } @@ -2493,7 +2456,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, c.op == UPDATE ? updateVal : oldVal, null, - keepBinary, + c.keepBinary, c.updateRes.updateCounter() ); @@ -5852,7 +5815,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private final boolean keepBinary; /** */ - private final IgniteCacheExpiryPolicy expiryPlc; + public final IgniteCacheExpiryPolicy expiryPlc; /** */ private final boolean primary; @@ -5902,8 +5865,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** Disable interceptor invocation onAfter* methods flag. */ private boolean wasIntercepted; + /** Index in update request. */ + public final int reqIdx; + /** */ public AtomicCacheUpdateClosure( + int reqIdx, GridCacheMapEntry entry, AffinityTopologyVersion topVer, GridCacheVersion newVer, @@ -5926,6 +5893,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean skipInterceptorOnConflict) { assert op == UPDATE || op == DELETE || op == TRANSFORM : op; + this.reqIdx = reqIdx; this.entry = entry; this.topVer = topVer; this.newVer = newVer; @@ -6733,64 +6701,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - /** - * - */ - public static class AtomicCacheBatchUpdateClosure extends AtomicCacheUpdateClosure { - /** Index in update request. */ - public final int reqIdx; - - /** Readers before clusure execution (reader cleared after remove). */ - public final GridDhtCacheEntry.ReaderId[] rdrs; - - public AtomicCacheBatchUpdateClosure( - int reqIdx, - GridDhtCacheEntry entry, - AffinityTopologyVersion topVer, - GridCacheVersion newVer, - GridCacheOperation op, - Object writeObj, - Object[] invokeArgs, - boolean readThrough, - boolean writeThrough, - boolean keepBinary, - @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean primary, - boolean verCheck, - @Nullable CacheEntryPredicate[] filter, - long explicitTtl, - long explicitExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean conflictResolve, - boolean intercept, - @Nullable Long updateCntr, - boolean skipInterceptorOnConflict) { - super(entry, - topVer, - newVer, - op, - writeObj, - invokeArgs, - readThrough, - writeThrough, - keepBinary, - expiryPlc, - primary, - verCheck, - filter, - explicitTtl, - explicitExpireTime, - conflictVer, - conflictResolve, - intercept, - updateCntr, - skipInterceptorOnConflict); - - this.reqIdx = reqIdx; - rdrs = entry.readersLocked(); - } - } - /** {@inheritDoc} */ @Override public GridCacheUpdateTxResult mvccUpdateRowsWithPreloadInfo( IgniteInternalTx tx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index de2451e..5f58e71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntry.AtomicCacheBatchUpdateClosure; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry.AtomicCacheUpdateClosure; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -1767,7 +1766,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { !ctx.store().isLocal() && // and this is not local store (conflict resolver should be used for local store) !ctx.dr().receiveEnabled(); // and no DR. - Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart = + Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> byPart = batchStoreUpdate ? new HashMap<>() : null; while (true) { @@ -1982,7 +1981,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, DhtAtomicUpdateResult dhtUpdRes, - @Nullable Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart + @Nullable Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> byPart ) throws GridCacheEntryRemovedException { @@ -2115,7 +2114,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final IgniteCacheExpiryPolicy expiry, final boolean sndPrevVal, final DhtAtomicUpdateResult dhtUpdRes, - final Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart + final Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> byPart ) throws GridCacheEntryRemovedException { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. @@ -2457,30 +2456,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridDrType drType = replicate ? DR_PRIMARY : DR_NONE; - for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> e0 : byPart.entrySet()) { + for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> e0 : byPart.entrySet()) { try { - Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = e0.getValue(); + Map<CacheSearchRow, AtomicCacheUpdateClosure> map = e0.getValue(); ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map::get); - for (Map.Entry<CacheSearchRow, AtomicCacheBatchUpdateClosure> e : map.entrySet()) { - AtomicCacheBatchUpdateClosure c = e.getValue(); + for (Map.Entry<CacheSearchRow, AtomicCacheUpdateClosure> e : map.entrySet()) { + AtomicCacheUpdateClosure c = e.getValue(); updateSingleEntryPartialBatch( - (GridDhtCacheEntry)c.entry(), c, - ver, (CacheObject)c.writeValue(), entryProcessorMap, node, hasNear, taskName, - expiry, drType, req, res, dhtUpdRes, - c.reqIdx, affAssignment, sndPrevVal); } @@ -2589,148 +2584,137 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridDrType drType = replicate ? DR_PRIMARY : DR_NONE; - if (req.size() > 1) { - Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false); + boolean batchUpdate = req.size() > 1; + + Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false); + + boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM || + !F.isEmptyOrNulls(req.filter()); - boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM || - !F.isEmptyOrNulls(req.filter()); + Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> byPart = + batchUpdate ? new HashMap<>() : null; - Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart = new HashMap<>(); + BitSet retryEntries = dhtUpdRes.retryEntries(); - BitSet retryEntries = dhtUpdRes.retryEntries(); + if (retryEntries != null) + dhtUpdRes.retryEntries(null); - if (retryEntries != null) - dhtUpdRes.retryEntries(null); + for (int i = 0; i < req.size(); i++) { + GridDhtCacheEntry entry = locked.get(i); - for (int i = 0; i < req.size(); i++) { + try { if (retryEntries != null && !retryEntries.get(i)) continue; - GridDhtCacheEntry entry = locked.get(i); + // Possibly read value from store. + boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() && + (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue())); - try { - entry.key().valueBytes(ctx.cacheObjectContext()); + AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure( + i, + entry, + topVer, + ver, + req.operation(), + op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i), + req.invokeArguments(), + readFromStore, + writeThrough() && !req.skipStore(), + req.keepBinary(), + expiry, + /*primary*/true, + /*verCheck*/false, + req.filter(), + req.conflictTtl(i), + req.conflictExpireTime(i), + req.conflictVersion(i), + /*conflictResolve*/true, + intercept, + null, + ctx.disableTriggeringCacheInterceptorOnConflict() + ); + + entry.key().valueBytes(ctx.cacheObjectContext()); + if (batchUpdate) { GridDhtLocalPartition part = entry.localPartition(); - TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = byPart.get(part); + TreeMap<CacheSearchRow, AtomicCacheUpdateClosure> map = byPart.get(part); IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part); if (map == null) byPart.put(part, map = new TreeMap<>(dataStore.rowsComparator())); - // Possibly read value from store. - boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() && - (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue())); - - AtomicCacheBatchUpdateClosure c = new AtomicCacheBatchUpdateClosure( - i, - entry, - topVer, - ver, - req.operation(), - op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i), - req.invokeArguments(), - readFromStore, - writeThrough() && !req.skipStore(), - req.keepBinary(), - expiry, - /*primary*/true, - /*verCheck*/false, - req.filter(), - req.conflictTtl(i), - req.conflictExpireTime(i), - req.conflictVersion(i), - /*conflictResolve*/true, - intercept, - null, - ctx.disableTriggeringCacheInterceptorOnConflict() - ); - map.put(dataStore.createSearchRow(ctx, entry.key(), null), c); } - catch (IgniteCheckedException e) { - res.addFailedKey(entry.key(), e); + else { + updateSingleEntry( + c, + true, + nearNode, + hasNear, + taskName, + drType, + req, + res, + dhtUpdRes, + affAssignment, + sndPrevVal); } } + catch (IgniteCheckedException e) { + res.addFailedKey(entry.key(), e); + } + } - RuntimeException err = null; + if (!batchUpdate) + return; - for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> e0 : byPart.entrySet()) { - try { - Map<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = e0.getValue(); + RuntimeException err = null; - try { - ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map::get); - } - catch (UnregisteredClassException | UnregisteredBinaryTypeException e) { - err = e; - } + for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> e0 : byPart.entrySet()) { + try { + Map<CacheSearchRow, AtomicCacheUpdateClosure> map = e0.getValue(); - for (Map.Entry<CacheSearchRow, AtomicCacheBatchUpdateClosure> e : map.entrySet()) { - AtomicCacheBatchUpdateClosure c = e.getValue(); + try { + ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map::get); + } + catch (UnregisteredClassException | UnregisteredBinaryTypeException e) { + err = e; + } - if (c.operationType() == null) { - dhtUpdRes.addRetryEntry(c.reqIdx); + for (Map.Entry<CacheSearchRow, AtomicCacheUpdateClosure> e : map.entrySet()) { + AtomicCacheUpdateClosure c = e.getValue(); - continue; - } + if (c.operationType() == null) { + dhtUpdRes.addRetryEntry(c.reqIdx); - updateSingleEntry( - (GridDhtCacheEntry)c.entry(), - c, - ver, - nearNode, - hasNear, - taskName, - expiry, - drType, - req, - res, - dhtUpdRes, - c.reqIdx, - affAssignment, - sndPrevVal); + continue; } - } - catch (IgniteCheckedException e) { - for (CacheSearchRow row : e0.getValue().keySet()) - res.addFailedKey(row.key(), e); - } - } - if (err != null) - throw err; - } else { - // Avoid iterator creation. - for (int i = dhtUpdRes.processedEntriesCount(); i < req.size(); i++) { - // We are holding java-level locks on entries at this point. - // No GridCacheEntryRemovedException can be thrown. - try { updateSingleEntry( - locked.get(i), - null, - ver, + c, + false, nearNode, hasNear, taskName, - expiry, drType, req, res, dhtUpdRes, - i, affAssignment, sndPrevVal); } - catch (IgniteCheckedException e) { - res.addFailedKey(req.key(i), e); - } - - dhtUpdRes.processedEntriesCount(i + 1); + } + catch (IgniteCheckedException e) { + for (CacheSearchRow row : e0.getValue().keySet()) + res.addFailedKey(row.key(), e); } } + + if (err != null) + throw err; } /** @@ -2752,57 +2736,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @throws GridCacheEntryRemovedException Should be never thrown. */ private void updateSingleEntry( - GridDhtCacheEntry entry, - @Nullable AtomicCacheBatchUpdateClosure c, - GridCacheVersion ver, + AtomicCacheUpdateClosure c, + boolean updateOffheap, ClusterNode nearNode, boolean hasNear, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry, GridDrType drType, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, DhtAtomicUpdateResult dhtUpdRes, - int idx, AffinityAssignment affAssignment, boolean sndPrevVal ) throws IgniteCheckedException, GridCacheEntryRemovedException { - GridCacheVersion newConflictVer = req.conflictVersion(idx); - long newConflictTtl = req.conflictTtl(idx); - long newConflictExpireTime = req.conflictExpireTime(idx); - - assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; - - Object writeVal = req.operation() == TRANSFORM ? req.entryProcessor(idx) : req.writeValue(idx); + GridDhtCacheEntry entry = (GridDhtCacheEntry)c.entry(); // Get readers before innerUpdate (reader cleared after remove). - GridDhtCacheEntry.ReaderId[] readers = c != null ? c.rdrs : entry.readersLocked(); + GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked(); GridCacheUpdateAtomicResult updRes = entry.innerUpdate( c, - ver, + updateOffheap, nearNode.id(), locNodeId, req.operation(), - writeVal, - req.invokeArguments(), - writeThrough() && !req.skipStore(), - !req.skipStore(), + req.operation() == TRANSFORM ? req.entryProcessor(c.reqIdx) : req.writeValue(c.reqIdx), sndPrevVal || req.returnValue(), - req.keepBinary(), - expiry, /*event*/true, /*metrics*/true, /*primary*/true, - /*verCheck*/false, req.topologyVersion(), - req.filter(), drType, - newConflictTtl, - newConflictExpireTime, - newConflictVer, - /*conflictResolve*/true, ctx.config().getInterceptor() != null, req.subjectId(), taskName, @@ -2819,6 +2783,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); + GridCacheVersion newConflictVer = req.conflictVersion(c.reqIdx); + + assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; + if (conflictCtx == null) newConflictVer = null; else if (conflictCtx.isMerge()) @@ -2861,14 +2829,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.sendToDht()) { if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), req.topologyVersion())) { // If put the same value as in request then do not need to send it back. - if (op == TRANSFORM || req.writeValue(idx) != updRes.newValue()) { - res.addNearValue(idx, + if (op == TRANSFORM || req.writeValue(c.reqIdx) != updRes.newValue()) { + res.addNearValue(c.reqIdx, updRes.newValue(), updRes.newTtl(), updRes.conflictExpireTime()); } else - res.addNearTtl(idx, updRes.newTtl(), updRes.conflictExpireTime()); + res.addNearTtl(c.reqIdx, updRes.newTtl(), updRes.conflictExpireTime()); if (updRes.newValue() != null) { IgniteInternalFuture<Boolean> f = @@ -2882,10 +2850,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entry.removeReader(nearNode.id(), req.messageId()); } else - res.addSkippedIndex(idx); + res.addSkippedIndex(c.reqIdx); } else - res.addSkippedIndex(idx); + res.addSkippedIndex(c.reqIdx); } if (updRes.removeVersion() != null) { @@ -2959,7 +2927,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicUpdateResponse res, final DhtAtomicUpdateResult dhtUpdRes, @Nullable final IgniteCacheExpiryPolicy expiry, - Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart + Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> byPart ) { assert putMap == null ^ rmvKeys == null; @@ -3044,14 +3012,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridDhtLocalPartition part = entry.localPartition(); - TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure> map = byPart.get(part); + TreeMap<CacheSearchRow, AtomicCacheUpdateClosure> map = byPart.get(part); IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part); if (map == null) byPart.put(part, map = new TreeMap<>(dataStore.rowsComparator())); - AtomicCacheBatchUpdateClosure c = new AtomicCacheBatchUpdateClosure( + AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure( firstEntryIdx + i, entry, topVer, @@ -3101,23 +3069,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** */ private void updateSingleEntryPartialBatch( - GridDhtCacheEntry entry, - @Nullable AtomicCacheBatchUpdateClosure c, - GridCacheVersion ver, + AtomicCacheUpdateClosure c, @Nullable final CacheObject writeVal, @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, ClusterNode nearNode, boolean hasNear, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry, GridDrType drType, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, DhtAtomicUpdateResult dhtUpdRes, - int idx, AffinityAssignment affAssignment, boolean sndPrevVal ) throws GridCacheEntryRemovedException, IgniteCheckedException { + GridDhtCacheEntry entry = (GridDhtCacheEntry)c.entry(); + final GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture(); AffinityTopologyVersion topVer = req.topologyVersion(); @@ -3126,34 +3092,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); // Get readers before innerUpdate (reader cleared after remove). - GridDhtCacheEntry.ReaderId[] readers = c != null ? c.rdrs : entry.readersLocked(); + GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked(); GridCacheOperation op = writeVal == null ? DELETE : UPDATE; GridCacheUpdateAtomicResult updRes = entry.innerUpdate( c, - ver, + false, nearNode.id(), locNodeId, op, writeVal, - null, - /*write-through*/false, - /*read-through*/false, /*retval*/sndPrevVal, - req.keepBinary(), - expiry, /*event*/true, /*metrics*/true, /*primary*/true, - /*verCheck*/false, topVer, - null, drType, - CU.TTL_NOT_CHANGED, - CU.EXPIRE_TIME_CALCULATE, - null, - /*conflict resolve*/false, /*intercept*/false, req.subjectId(), taskName, @@ -3162,8 +3117,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut, entryProcessor != null); - assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : - "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; + assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || c.expiryPlc != null : + "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + c.expiryPlc; if (ctx.config().getInterceptor() != null) { if (op == UPDATE) { @@ -3213,13 +3168,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) { if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) { if (req.operation() == TRANSFORM) { - res.addNearValue(idx, + res.addNearValue(c.reqIdx, writeVal, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } else - res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); + res.addNearTtl(c.reqIdx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); if (writeVal != null || entry.hasValue()) { IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer); @@ -3232,7 +3187,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entry.removeReader(nearNode.id(), req.messageId()); } else - res.addSkippedIndex(idx); + res.addSkippedIndex(c.reqIdx); } } @@ -3617,14 +3572,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheObject val = req.value(idx); EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); - Long updateIdx = req.updateCounter(idx); GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; - long ttl = req.ttl(idx); - long expireTime = req.conflictExpireTime(idx); - AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure( + batchStart + i, entry, req.topologyVersion(), req.writeVersion(), @@ -3638,12 +3590,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*primary*/false, /*check version*/!req.forceTransformBackups(), CU.empty0(), - ttl, - expireTime, - req.conflictVersion(i), + req.ttl(idx), + req.conflictExpireTime(idx), + req.conflictVersion(idx), false, intercept, - updateIdx, + req.updateCounter(idx), ctx.disableTriggeringCacheInterceptorOnConflict()); SearchRowEx<AtomicCacheUpdateClosure> row = (SearchRowEx<AtomicCacheUpdateClosure>) @@ -3710,13 +3662,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (batchSize == 1) { dhtAtomicUpdateRequestUpdateSingle( + batchStart, nodeId, req, nearRes, taskName, writeThrough, - intercept, - batchStart); + intercept); return; } @@ -3745,15 +3697,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AtomicCacheUpdateClosure c = locked.get(i).data(); dhtAtomicUpdateRequestUpdateSingleEntry( - c.entry(), c, + false, nodeId, req, nearRes, taskName, - writeThrough, - intercept, - batchStart + i); + intercept); } } finally { @@ -3798,52 +3748,38 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } private GridCacheUpdateAtomicResult dhtAtomicUpdateRequestUpdateSingleEntry( - GridCacheMapEntry entry, AtomicCacheUpdateClosure c, + boolean updateOffheap, UUID nodeId, GridDhtAtomicAbstractUpdateRequest req, @Nullable GridDhtAtomicNearResponse nearRes, String taskName, - boolean writeThrough, - boolean intercept, - int idx + boolean intercept ) throws NodeStoppingException, GridCacheEntryRemovedException { - CacheObject val = req.value(idx); - CacheObject prevVal = req.previousValue(idx); + GridCacheMapEntry entry = c.entry(); - EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(idx); - Long updateIdx = req.updateCounter(idx); + CacheObject val = req.value(c.reqIdx); + CacheObject prevVal = req.previousValue(c.reqIdx); - GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(c.reqIdx); + Long updateIdx = req.updateCounter(c.reqIdx); - long ttl = req.ttl(idx); - long expireTime = req.conflictExpireTime(idx); + GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; try { return entry.innerUpdate( c, - req.writeVersion(), + updateOffheap, nodeId, nodeId, op, op == TRANSFORM ? entryProcessor : val, - op == TRANSFORM ? req.invokeArguments() : null, - writeThrough, - /*read-through*/false, /*retval*/false, - req.keepBinary(), - /*expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/!req.forceTransformBackups(), req.topologyVersion(), - CU.empty0(), ctx.isDrEnabled() ? DR_BACKUP : DR_NONE, - ttl, - expireTime, - req.conflictVersion(idx), - false, intercept, req.subjectId(), taskName, @@ -3873,15 +3809,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } private void dhtAtomicUpdateRequestUpdateSingle( + int idx, UUID nodeId, GridDhtAtomicAbstractUpdateRequest req, @Nullable GridDhtAtomicNearResponse nearRes, String taskName, boolean writeThrough, - boolean intercept, - int i) throws NodeStoppingException + boolean intercept) throws NodeStoppingException { - KeyCacheObject key = req.key(i); + KeyCacheObject key = req.key(idx); try { while (true) { @@ -3890,16 +3826,43 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { entry = entryExx(key); - GridCacheUpdateAtomicResult updRes = dhtAtomicUpdateRequestUpdateSingleEntry( + CacheObject val = req.value(idx); + + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(idx); + + GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; + + AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure( + idx, entry, - null, + req.topologyVersion(), + req.writeVersion(), + op, + op == TRANSFORM ? entryProcessor : val, + op == TRANSFORM ? req.invokeArguments() : null, + /*read-through*/false, + writeThrough, + req.keepBinary(), + /*expiry policy*/null, + /*primary*/false, + /*check version*/!req.forceTransformBackups(), + CU.empty0(), + req.ttl(idx), + req.conflictExpireTime(idx), + req.conflictVersion(idx), + false, + intercept, + req.updateCounter(idx), + ctx.disableTriggeringCacheInterceptorOnConflict()); + + GridCacheUpdateAtomicResult updRes = dhtAtomicUpdateRequestUpdateSingleEntry( + c, + true, nodeId, req, nearRes, taskName, - writeThrough, - intercept, - i); + intercept); if (updRes != null && updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); @@ -3970,7 +3933,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } else { for (int i = 0; i < req.size(); i++) - dhtAtomicUpdateRequestUpdateSingle(nodeId, req, nearRes, taskName, writeThrough, intercept, i); + dhtAtomicUpdateRequestUpdateSingle(i, nodeId, req, nearRes, taskName, writeThrough, intercept); } } catch (NodeStoppingException e){ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a908afa..fe873a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -37,6 +37,8 @@ import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry.AtomicCacheUpdateClosure; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -228,7 +230,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { boolean transformedValue) throws IgniteCheckedException { try { while (true) { - GridCacheEntryEx entry = null; + GridCacheMapEntry entry = null; AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); @@ -237,31 +239,43 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridCacheOperation op = val != null ? UPDATE : DELETE; - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( - null, + AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure( + 0, + entry, + topVer, ver, - nodeId, - nodeId, op, val, null, - /*write-through*/false, /*read-through*/false, - /*retval*/false, + /*write-through*/false, keepBinary, - /*expiry policy*/null, - /*event*/true, - /*metrics*/true, - /*primary*/false, + null, + false, /*check version*/true, - topVer, CU.empty0(), - DR_NONE, ttl, expireTime, null, false, false, + null, + ctx.disableTriggeringCacheInterceptorOnConflict()); + + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( + c, + true, + nodeId, + nodeId, + op, + val, + /*retval*/false, + /*event*/true, + /*metrics*/true, + /*primary*/false, + topVer, + DR_NONE, + false, subjId, taskName, null, @@ -318,7 +332,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { try { while (true) { try { - GridCacheEntryEx entry = peekEx(key); + GridCacheMapEntry entry = (GridCacheMapEntry)peekEx(key); if (entry == null) { if (nearEvicted == null) @@ -335,33 +349,42 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; - long ttl = req.nearTtl(i); - long expireTime = req.nearExpireTime(i); - - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( - null, + AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure( + i, + entry, + req.topologyVersion(), ver, - nodeId, - nodeId, op, op == TRANSFORM ? entryProcessor : val, op == TRANSFORM ? req.invokeArguments() : null, - /*write-through*/false, /*read-through*/false, - /*retval*/false, + /*write-through*/false, req.keepBinary(), null, + false, + /*check version*/!req.forceTransformBackups(), + CU.empty0(), + req.nearTtl(i), + req.nearExpireTime(i), + null, + false, + intercept, + null, + ctx.disableTriggeringCacheInterceptorOnConflict()); + + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( + c, + true, + nodeId, + nodeId, + op, + op == TRANSFORM ? entryProcessor : val, + /*retval*/false, /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/!req.forceTransformBackups(), req.topologyVersion(), - CU.empty0(), DR_NONE, - ttl, - expireTime, - null, - false, intercept, req.subjectId(), taskName, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 711be67..4720415 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -537,36 +537,25 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr /** {@inheritDoc} */ @Override public GridCacheUpdateAtomicResult innerUpdate( @Nullable GridCacheMapEntry.AtomicCacheUpdateClosure c, - GridCacheVersion ver, + boolean updateOffheap, UUID evtNodeId, UUID affNodeId, GridCacheOperation op, @Nullable Object val, - @Nullable Object[] invokeArgs, - boolean writeThrough, - boolean readThrough, boolean retval, - boolean keepBinary, - @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, - boolean checkVer, AffinityTopologyVersion topVer, - @Nullable CacheEntryPredicate[] filter, GridDrType drType, - long conflictTtl, - long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean conflictResolve, boolean intercept, - UUID subjId, + @Nullable UUID subjId, String taskName, @Nullable CacheObject prevVal, @Nullable Long updateCntr, @Nullable GridDhtAtomicAbstractUpdateFuture fut, - boolean transformOp) - throws IgniteCheckedException, GridCacheEntryRemovedException { + boolean transformOp + ) { assert false; return null;