ignite-2645: Fixed assertion error in ATOMIC cache for invokeAll and cache store
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18e355bb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18e355bb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18e355bb Branch: refs/heads/ignite-testing-discovery Commit: 18e355bb1e466cc5ded836b228a721adceb09df5 Parents: c28fadd Author: ashutak <[email protected]> Authored: Mon Apr 11 19:48:00 2016 +0300 Committer: ashutak <[email protected]> Committed: Mon Apr 11 19:48:00 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 7 +- .../processors/cache/GridCacheEntryEx.java | 10 +- .../processors/cache/GridCacheMapEntry.java | 65 ++--- .../dht/GridDhtTransactionalCacheAdapter.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 1 + .../dht/GridPartitionedGetFuture.java | 5 +- .../dht/GridPartitionedSingleGetFuture.java | 5 +- .../dht/atomic/GridDhtAtomicCache.java | 55 ++-- .../dht/colocated/GridDhtColocatedCache.java | 5 +- .../distributed/near/GridNearGetFuture.java | 10 +- .../local/atomic/GridLocalAtomicCache.java | 17 +- .../cache/transactions/IgniteTxAdapter.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 34 ++- .../EntryVersionConsistencyReadThroughTest.java | 265 +++++++++++++++++++ .../processors/cache/GridCacheTestEntryEx.java | 5 +- .../testsuites/IgniteCacheTestSuite5.java | 4 +- 16 files changed, 417 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 6d4ee58..46b6677 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1881,7 +1881,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } try { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null, + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + null, ctx.isSwapOrOffheapEnabled(), /*unmarshal*/true, /*update-metrics*/!skipVals, @@ -2332,7 +2334,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { + @Override public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -5010,6 +5012,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V { CacheObject val = entry.innerGet( null, + null, false, false, false, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 29283e2..a7880e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -273,6 +273,7 @@ public interface GridCacheEntryEx { public boolean partitionValid(); /** + * @param ver Cache version to set. The version will be used on updating entry instead of generated one. * @param tx Ongoing transaction (possibly null). * @param readSwap Flag indicating whether to check swap memory. * @param readThrough Flag indicating whether to read through. @@ -288,11 +289,13 @@ public interface GridCacheEntryEx { * @param transformClo Transform closure to record event. * @param taskName Task name. * @param expiryPlc Expiry policy. + * @param keepBinary Keep binary flag. * @return Cached value. * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable public CacheObject innerGet(@Nullable IgniteInternalTx tx, + @Nullable public CacheObject innerGet(@Nullable GridCacheVersion ver, + @Nullable IgniteInternalTx tx, boolean readSwap, boolean readThrough, boolean failFast, @@ -308,6 +311,7 @@ public interface GridCacheEntryEx { throws IgniteCheckedException, GridCacheEntryRemovedException; /** + * @param ver Cache version to set. The version will be used on updating entry instead of generated one. * @param tx Cache transaction. * @param readSwap Flag indicating whether to check swap memory. * @param unmarshal Unmarshal flag. @@ -317,11 +321,13 @@ public interface GridCacheEntryEx { * @param transformClo Transform closure to record event. * @param taskName Task name. * @param expiryPlc Expiry policy. + * @param keepBinary Keep binary flag. * @return Cached value and entry version. * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. */ @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned( + @Nullable GridCacheVersion ver, IgniteInternalTx tx, boolean readSwap, boolean unmarshal, @@ -354,6 +360,7 @@ public interface GridCacheEntryEx { * @param ttl Time to live. * @param evt Flag to signal event notification. * @param metrics Flag to signal metrics update. + * @param keepBinary Keep binary flag. * @param topVer Topology version. * @param filter Filter. * @param drType DR type. @@ -397,6 +404,7 @@ public interface GridCacheEntryEx { * @param retval {@code True} if value should be returned (and unmarshalled if needed). * @param evt Flag to signal event notification. * @param metrics Flag to signal metrics notification. + * @param keepBinary Keep binary flag. * @param topVer Topology version. * @param filter Filter. * @param drType DR type. http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 83f5d5c..f31a992 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -662,7 +662,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Nullable @Override public final CacheObject innerGet(@Nullable IgniteInternalTx tx, + @Nullable @Override public final CacheObject innerGet(@Nullable GridCacheVersion ver, + @Nullable IgniteInternalTx tx, boolean readSwap, boolean readThrough, boolean failFast, @@ -676,7 +677,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteCacheExpiryPolicy expirePlc, boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException { - return (CacheObject)innerGet0(tx, + return (CacheObject)innerGet0(ver, + tx, readSwap, readThrough, evt, @@ -693,6 +695,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** {@inheritDoc} */ @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned( + @Nullable GridCacheVersion ver, IgniteInternalTx tx, boolean readSwap, boolean unmarshal, @@ -704,7 +707,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException { - return (T2<CacheObject, GridCacheVersion>)innerGet0(tx, + return (T2<CacheObject, GridCacheVersion>)innerGet0(ver, + tx, readSwap, false, evt, @@ -721,7 +725,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "RedundantTypeArguments", "TooBroadScope"}) - private Object innerGet0(IgniteInternalTx tx, + private Object innerGet0( + GridCacheVersion nextVer, + IgniteInternalTx tx, boolean readSwap, boolean readThrough, boolean evt, @@ -937,7 +943,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Detach value before index update. ret = cctx.kernalContext().cacheObjects().prepareForCache(ret, cctx); - GridCacheVersion nextVer = nextVersion(); + nextVer = nextVer != null ? nextVer : nextVersion(); CacheObject prevVal = rawGetOrUnmarshalUnlocked(false); @@ -949,7 +955,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean hadValPtr = hasOffHeapPointer(); - // Don't change version for read-through. update(ret, expTime, ttl, nextVer, true); if (hadValPtr && cctx.offheapTiered()) @@ -1832,32 +1837,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @SuppressWarnings("unchecked") @Override public GridCacheUpdateAtomicResult innerUpdate( GridCacheVersion newVer, - UUID evtNodeId, - UUID affNodeId, + final UUID evtNodeId, + final UUID affNodeId, GridCacheOperation op, @Nullable Object writeObj, - @Nullable Object[] invokeArgs, - boolean writeThrough, - boolean readThrough, - boolean retval, - boolean keepBinary, - @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean evt, - boolean metrics, - boolean primary, - boolean verCheck, - AffinityTopologyVersion topVer, - @Nullable CacheEntryPredicate[] filter, - GridDrType drType, - long explicitTtl, - long explicitExpireTime, + @Nullable final Object[] invokeArgs, + final boolean writeThrough, + final boolean readThrough, + final boolean retval, + final boolean keepBinary, + @Nullable final IgniteCacheExpiryPolicy expiryPlc, + final boolean evt, + final boolean metrics, + final boolean primary, + final boolean verCheck, + final AffinityTopologyVersion topVer, + @Nullable final CacheEntryPredicate[] filter, + final GridDrType drType, + final long explicitTtl, + final long explicitExpireTime, @Nullable GridCacheVersion conflictVer, - boolean conflictResolve, - boolean intercept, - @Nullable UUID subjId, - String taskName, - @Nullable CacheObject prevVal, - @Nullable Long updateCntr + final boolean conflictResolve, + final boolean intercept, + @Nullable final UUID subjId, + final String taskName, + @Nullable final CacheObject prevVal, + @Nullable final Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -2097,7 +2102,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } else assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 : - "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']'; + "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']'; } // Prepare old value and value bytes. http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index ae24ed1..d0b8092 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -1123,7 +1123,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach CacheObject val = null; if (ret) - val = e.innerGet(tx, + val = e.innerGet( + null, + tx, /*swap*/true, /*read-through*/false, /*fail-fast.*/false, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 00cbd29..44d7bc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -347,6 +347,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter final boolean keepBinary = txEntry.keepBinary(); CacheObject val = cached.innerGet( + null, tx, /*swap*/true, readThrough, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 037d408..7d2c5db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -451,6 +451,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( null, + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, @@ -467,7 +468,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } } else { - v = entry.innerGet(null, + v = entry.innerGet( + null, + null, /*swap*/true, /*read-through*/false, /*fail-fast*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 01e61bf..b0b9d7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -375,6 +375,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( null, + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, @@ -391,7 +392,9 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im } } else { - v = entry.innerGet(null, + v = entry.innerGet( + null, + null, /*swap*/true, /*read-through*/false, /*fail-fast*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 11fd855..cae13e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1198,6 +1198,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( null, + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, @@ -1215,6 +1216,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } else { v = entry.innerGet(null, + null, /*swap*/true, /*read-through*/false, /*fail-fast*/true, @@ -1557,18 +1559,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private UpdateBatchResult updateWithBatch( - ClusterNode node, - boolean hasNear, - GridNearAtomicUpdateRequest req, - GridNearAtomicUpdateResponse res, - List<GridDhtCacheEntry> locked, - GridCacheVersion ver, + final ClusterNode node, + final boolean hasNear, + final GridNearAtomicUpdateRequest req, + final GridNearAtomicUpdateResponse res, + final List<GridDhtCacheEntry> locked, + final GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, - boolean replicate, - String taskName, - @Nullable IgniteCacheExpiryPolicy expiry, - boolean sndPrevVal + final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + final boolean replicate, + final String taskName, + @Nullable final IgniteCacheExpiryPolicy expiry, + final boolean sndPrevVal ) throws GridCacheEntryRemovedException { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. @@ -1642,6 +1644,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); CacheObject old = entry.innerGet( + ver, null, /*read swap*/true, /*read through*/true, @@ -1799,6 +1802,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (intercept) { CacheObject old = entry.innerGet( null, + null, /*read swap*/true, /*read through*/ctx.loadPreviousValue(), /*fail fast*/false, @@ -1838,6 +1842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (intercept) { CacheObject old = entry.innerGet( null, + null, /*read swap*/true, /*read through*/ctx.loadPreviousValue(), /*fail fast*/false, @@ -2227,24 +2232,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("ForLoopReplaceableByForEach") @Nullable private GridDhtAtomicUpdateFuture updatePartialBatch( - boolean hasNear, - int firstEntryIdx, - List<GridDhtCacheEntry> entries, + final boolean hasNear, + final int firstEntryIdx, + final List<GridDhtCacheEntry> entries, final GridCacheVersion ver, - ClusterNode node, - @Nullable List<CacheObject> writeVals, - @Nullable Map<KeyCacheObject, CacheObject> putMap, - @Nullable Collection<KeyCacheObject> rmvKeys, - @Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, + final ClusterNode node, + @Nullable final List<CacheObject> writeVals, + @Nullable final Map<KeyCacheObject, CacheObject> putMap, + @Nullable final Collection<KeyCacheObject> rmvKeys, + @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, final GridNearAtomicUpdateRequest req, final GridNearAtomicUpdateResponse res, - boolean replicate, - UpdateBatchResult batchRes, - String taskName, - @Nullable IgniteCacheExpiryPolicy expiry, - boolean sndPrevVal + final boolean replicate, + final UpdateBatchResult batchRes, + final String taskName, + @Nullable final IgniteCacheExpiryPolicy expiry, + final boolean sndPrevVal ) { assert putMap == null ^ rmvKeys == null; http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index dc4b6bd..b954545 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -485,6 +485,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( null, + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, @@ -501,7 +502,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } } else { - v = entry.innerGet(null, + v = entry.innerGet( + null, + null, /*swap*/true, /*read-through*/false, /*fail-fast*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index e65a44a..edac92c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -435,6 +435,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( null, + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/true, @@ -451,7 +452,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } else { - v = entry.innerGet(tx, + v = entry.innerGet( + null, + tx, /*swap*/false, /*read-through*/false, /*fail-fast*/true, @@ -574,6 +577,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (needVer) { T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned( null, + null, /*swap*/true, /*unmarshal*/true, /**update-metrics*/false, @@ -590,7 +594,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } else { - v = dhtEntry.innerGet(tx, + v = dhtEntry.innerGet( + null, + tx, /*swap*/true, /*read-through*/false, /*fail-fast*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 651c4d0..3006a70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -595,6 +595,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (needVer) { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( null, + null, /*swap*/swapOrOffheap, /*unmarshal*/true, /**update-metrics*/false, @@ -622,7 +623,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { success = false; } else { - v = entry.innerGet(null, + v = entry.innerGet( + null, + null, /*swap*/swapOrOffheap, /*read-through*/false, /*fail-fast*/false, @@ -1179,7 +1182,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { EntryProcessor<Object, Object, Object> entryProcessor = (EntryProcessor<Object, Object, Object>)val; - CacheObject old = entry.innerGet(null, + CacheObject old = entry.innerGet( + null, + null, /*swap*/true, /*read-through*/true, /*fail-fast*/false, @@ -1301,7 +1306,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { CacheObject cacheVal = ctx.toCacheObject(val); if (intercept) { - CacheObject old = entry.innerGet(null, + CacheObject old = entry.innerGet( + null, + null, /*swap*/true, /*read-through*/ctx.loadPreviousValue(), /*fail-fast*/false, @@ -1336,7 +1343,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { assert op == DELETE; if (intercept) { - CacheObject old = entry.innerGet(null, + CacheObject old = entry.innerGet( + null, + null, /*swap*/true, /*read-through*/ctx.loadPreviousValue(), /*fail-fast*/false, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 6af388b..34f1fa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1243,7 +1243,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter final boolean keepBinary = txEntry.keepBinary(); CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : - txEntry.cached().innerGet(this, + txEntry.cached().innerGet( + null, + this, /*swap*/false, /*read through*/false, /*fail fast*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 785e937..dc1ec43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -445,7 +445,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter continue; try { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this, + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + this, /*readSwap*/true, /*unmarshal*/true, /*update-metrics*/!skipVals, @@ -1449,6 +1451,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (needVer) { T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned( + null, this, /*swap*/true, /*unmarshal*/true, @@ -1466,7 +1469,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { - val = txEntry.cached().innerGet(this, + val = txEntry.cached().innerGet( + null, + this, /*swap*/true, /*read-through*/false, /*fail fast*/true, @@ -1531,7 +1536,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (needReadVer) { T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? - entry.innerGetVersioned(this, + entry.innerGetVersioned( + null, + this, /*swap*/true, /*unmarshal*/true, /*metrics*/true, @@ -1548,7 +1555,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { - val = entry.innerGet(this, + val = entry.innerGet( + null, + this, /*swap*/true, /*no read-through*/false, /*fail-fast*/true, @@ -1869,6 +1878,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (needVer) { T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned( + null, IgniteTxLocalAdapter.this, /*swap*/cacheCtx.isSwapOrOffheapEnabled(), /*unmarshal*/true, @@ -1886,7 +1896,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else{ - val = cached.innerGet(IgniteTxLocalAdapter.this, + val = cached.innerGet( + null, + IgniteTxLocalAdapter.this, cacheCtx.isSwapOrOffheapEnabled(), /*read-through*/false, /*fail-fast*/true, @@ -2532,7 +2544,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter try { if (needReadVer) { T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? - entry.innerGetVersioned(this, + entry.innerGetVersioned( + null, + this, /*swap*/false, /*unmarshal*/retval, /*metrics*/retval, @@ -2549,7 +2563,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { - old = entry.innerGet(this, + old = entry.innerGet( + null, + this, /*swap*/false, /*read-through*/false, /*fail-fast*/false, @@ -2853,7 +2869,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean readThrough = (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore(); - v = cached.innerGet(this, + v = cached.innerGet( + null, + this, /*swap*/true, readThrough, /*failFast*/false, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/EntryVersionConsistencyReadThroughTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/EntryVersionConsistencyReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/EntryVersionConsistencyReadThroughTest.java new file mode 100644 index 0000000..4216b5e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/EntryVersionConsistencyReadThroughTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import javax.cache.Cache.Entry; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Tests consistency of entry's versions after invokeAll. + */ +public class EntryVersionConsistencyReadThroughTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_CNT = 5; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param atomicityMode Atomicity mode. + * @return Cache configuration. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private CacheConfiguration<String, List<Double>> createCacheConfiguration(CacheAtomicityMode atomicityMode) { + CacheConfiguration<String, List<Double>> cc = new CacheConfiguration<>(); + + cc.setCacheMode(PARTITIONED); + cc.setAtomicityMode(atomicityMode); + cc.setAtomicWriteOrderMode(PRIMARY); + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setReadThrough(true); + cc.setWriteThrough(true); + + Factory cacheStoreFactory = new FactoryBuilder.SingletonFactory(new DummyCacheStore()); + + cc.setCacheStoreFactory(cacheStoreFactory); + + cc.setBackups(2); + + return cc; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES_CNT - 1); + + client = true; + + startGrid(NODES_CNT - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAllTransactionalCache() throws Exception { + check(false, createCacheConfiguration(TRANSACTIONAL)); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAllAtomicCache() throws Exception { + check(false, createCacheConfiguration(ATOMIC)); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAtomicCache() throws Exception { + check(true, createCacheConfiguration(ATOMIC)); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeTransactionalCache() throws Exception { + check(true, createCacheConfiguration(TRANSACTIONAL)); + } + + /** + * Tests entry's versions consistency after invokeAll. + * + * @param single Single invoke or invokeAll. + * @param cc Cache configuration. + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + private void check(boolean single, CacheConfiguration cc) throws Exception { + grid(0).getOrCreateCache(cc); + + try { + final int cnt = 100; + + for (int i = 0; i < NODES_CNT; i++) { + final int iter = i; + + final Set<String> keys = new LinkedHashSet<String>() {{ + for (int i = 0; i < cnt; i++) + add("key-" + iter + "-" + i); + }}; + + IgniteEx grid = grid(i); + + final IgniteCache<String, Integer> cache = grid.cache(null); + + if (single) + for (String key : keys) + cache.invoke(key, new DummyEntryProcessor()); + else + cache.invokeAll(keys, new DummyEntryProcessor()); + + // Check entry versions consistency. + for (String key : keys) { + Collection<ClusterNode> nodes = grid.affinity(null).mapKeyToPrimaryAndBackups(key); + + List<IgniteEx> grids = grids(nodes); + + GridCacheVersion ver0 = null; + Object val0 = null; + + for (IgniteEx g : grids) { + GridCacheAdapter<Object, Object> cx = g.context().cache().internalCache(); + + GridCacheEntryEx e = cx.peekEx(key); + + assertNotNull("Failed to find entry on primary/backup node.", e); + + GridCacheVersion ver = e.version(); + Object val = e.rawGet().value(cx.context().cacheObjectContext(), true); + + if (ver0 == null) { + ver0 = ver; + val0 = val; + } + + assertEquals("Invalid version for key: " + key, ver0, ver); + + assertNotNull("No value for key: " + key, val); + assertEquals("Invalid value for key: " + key, val0, val); + } + } + } + } + finally { + grid(0).destroyCache(null); + } + } + + /** + * @param nodes Nodes. + * @return Grids. + */ + private List<IgniteEx> grids(Collection<ClusterNode> nodes) { + List<IgniteEx> grids = new ArrayList<>(); + + for (ClusterNode node : nodes) { + for (int i = 0; i < NODES_CNT; i++) { + if (grid(i).cluster().localNode().id().equals(node.id())) { + grids.add(grid(i)); + + break; + } + } + } + + return grids; + } + + /** + * + */ + private static class DummyEntryProcessor implements EntryProcessor<String, Integer, Integer> { + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<String, Integer> entry, Object... arguments) { + Integer currVal = entry.getValue(); + + if (currVal == null) + entry.setValue(0); + else + entry.setValue(currVal + 1); + + return null; + } + } + + /** + * + */ + @SuppressWarnings("serial") + private static class DummyCacheStore extends CacheStoreAdapter<String, Integer> implements Serializable { + /** {@inheritDoc} */ + @Override public Integer load(String key) throws CacheLoaderException { + return 1; + } + + /** {@inheritDoc} */ + @Override public void write(Entry<? extends String, ? extends Integer> entry) throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 97b7a8f..1bede92 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -426,7 +426,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ - @Override public CacheObject innerGet(@Nullable IgniteInternalTx tx, + @Override public CacheObject innerGet( + @Nullable GridCacheVersion ver, + @Nullable IgniteInternalTx tx, boolean readSwap, boolean readThrough, boolean failFast, @@ -444,6 +446,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr /** @inheritDoc */ @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned( + @Nullable GridCacheVersion ver, IgniteInternalTx tx, boolean readSwap, boolean unmarshal, http://git-wip-us.apache.org/repos/asf/ignite/blob/18e355bb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 3eb0b13..c5c5c67 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest; import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest; +import org.apache.ignite.internal.processors.cache.EntryVersionConsistencyReadThroughTest; import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest; import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest; @@ -40,7 +41,8 @@ public class IgniteCacheTestSuite5 extends TestSuite { suite.addTestSuite(IgniteCacheStoreCollectionTest.class); suite.addTestSuite(IgniteCacheWriteBehindNoUpdateSelfTest.class); suite.addTestSuite(IgniteCachePutStackOverflowSelfTest.class); + suite.addTestSuite(EntryVersionConsistencyReadThroughTest.class); return suite; } -} \ No newline at end of file +}
