IGNITE-1482 - Fixed incorrect cache value for replace() on changing topology.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/367d805d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/367d805d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/367d805d Branch: refs/heads/ignite-gg-10760 Commit: 367d805d10ea071532fe99c6b67cfc97cc8f2fb9 Parents: 91dd7c1 Author: sboikov <sboi...@gridgain.com> Authored: Tue Sep 15 14:54:20 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Sep 15 14:54:20 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedTxRemoteAdapter.java | 8 +-- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../IgniteCacheEntryProcessorNodeJoinTest.java | 73 ++++++++++++++++++++ 3 files changed, 78 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index c930d88..f969737 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -521,7 +521,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (updateNearCache(cacheCtx, txEntry.key(), topVer)) nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); - if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) + if (!F.isEmpty(txEntry.entryProcessors())) txEntry.cached().unswap(false); IgniteBiTuple<GridCacheOperation, CacheObject> res = @@ -573,12 +573,12 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Invalidate only for near nodes (backups cannot be invalidated). if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, + topVer, null, replicate ? DR_BACKUP : DR_NONE, near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); else { cached.innerSet(this, eventNodeId(), nodeId, val, false, false, - txEntry.ttl(), true, true, topVer, txEntry.filters(), + txEntry.ttl(), true, true, topVer, null, replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(), near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); @@ -598,7 +598,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } else if (op == DELETE) { cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, + topVer, null, replicate ? DR_BACKUP : DR_NONE, near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); // Keep near entry up to date. http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 89fc0ae..81cc272 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -842,7 +842,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter IgniteTxEntry e, Map<Integer, Collection<KeyCacheObject>> map ) { - if (retVal || !F.isEmpty(e.entryProcessors())) { + if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) { if (map == null) map = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/367d805d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java index af9477e..6b4d473 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -30,7 +30,9 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -201,6 +203,77 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes } } + /** + * @throws Exception If failed. + */ + public void testReplaceNodeJoin() throws Exception { + final AtomicReference<Throwable> error = new AtomicReference<>(); + final int started = 6; + + try { + int keys = 100; + + final AtomicBoolean done = new AtomicBoolean(false); + + for (int i = 0; i < keys; i++) + ignite(0).cache(null).put(i, 0); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < started; i++) { + U.sleep(1_000); + + IgniteEx grid = startGrid(GRID_CNT + i); + + info("Test started grid [idx=" + (GRID_CNT + i) + ", nodeId=" + grid.localNode().id() + ']'); + } + } + catch (Exception e) { + error.compareAndSet(null, e); + } + finally { + done.set(true); + } + } + }, 1, "starter"); + + int updVal = 0; + + try { + while (!done.get()) { + info("Will put: " + (updVal + 1)); + + for (int i = 0; i < keys; i++) + assertTrue("Failed [key=" + i + ", oldVal=" + updVal+ ']', + ignite(0).cache(null).replace(i, updVal, updVal + 1)); + + updVal++; + } + } + finally { + fut.get(getTestTimeout()); + } + + for (int i = 0; i < keys; i++) { + for (int g = 0; g < GRID_CNT + started; g++) { + Integer val = ignite(g).<Integer, Integer>cache(null).get(i); + + GridCacheEntryEx entry = ((IgniteKernal)grid(g)).internalCache(null).peekEx(i); + + if (updVal != val) + info("Invalid value for grid [g=" + g + ", entry=" + entry + ']'); + + assertEquals((Integer)updVal, val); + } + } + } + finally { + for (int i = 0; i < started; i++) + stopGrid(GRID_CNT + i); + } + } + /** */ private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable { /** */