ignite-6467 Fixed GridDhtInvalidPartitionException in lockAllAsync for client requests (tried to create cache entries before topology check)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d4c46b9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d4c46b9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d4c46b9 Branch: refs/heads/ignite-8446 Commit: 7d4c46b9bfd502712143d6e05d40546a5c3420fb Parents: f13d2ed Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 25 22:42:23 2018 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 25 22:42:23 2018 +0300 ---------------------------------------------------------------------- .../dht/GridDhtTransactionalCacheAdapter.java | 176 ++++++++----------- ...niteCacheClientNodeChangingTopologyTest.java | 95 ++++++++++ 2 files changed, 165 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4c46b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index fe1aee9..161c542 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -917,39 +917,85 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach GridDhtLockFuture fut = null; - if (!req.inTx()) { - GridDhtPartitionTopology top = null; + GridDhtPartitionTopology top = null; - if (req.firstClientRequest()) { - assert CU.clientNode(nearNode); + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); - top = topology(); + top = topology(); - top.readLock(); + top.readLock(); - if (!top.topologyVersionFuture().isDone()) { - top.readUnlock(); + if (!top.topologyVersionFuture().isDone()) { + top.readUnlock(); - return null; - } + return null; } + } - try { - if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) { - if (log.isDebugEnabled()) { - log.debug("Client topology version mismatch, need remap lock request [" + + try { + if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + "reqTopVer=" + req.topologyVersion() + ", locTopVer=" + top.readyTopologyVersion() + ", req=" + req + ']'); - } + } - GridNearLockResponse res = sendClientLockRemapResponse(nearNode, - req, - top.lastTopologyChangeVersion()); + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.lastTopologyChangeVersion()); - return new GridFinishedFuture<>(res); - } + return new GridFinishedFuture<>(res); + } + + if (req.inTx()) { + if (tx == null) { + tx = new GridDhtTxLocal( + ctx.shared(), + req.topologyVersion(), + nearNode.id(), + req.version(), + req.futureId(), + req.miniId(), + req.threadId(), + /*implicitTx*/false, + /*implicitSingleTx*/false, + ctx.systemTx(), + false, + ctx.ioPolicy(), + PESSIMISTIC, + req.isolation(), + req.timeout(), + req.isInvalidate(), + !req.skipStore(), + false, + req.txSize(), + null, + req.subjectId(), + req.taskNameHash()); + + if (req.syncCommit()) + tx.syncMode(FULL_SYNC); + + tx = ctx.tm().onCreated(null, tx); + if (tx == null || !tx.init()) { + String msg = "Failed to acquire lock (transaction has been completed): " + + req.version(); + + U.warn(log, msg); + + if (tx != null) + tx.rollbackDhtLocal(); + + return new GridDhtFinishedFuture<>(new IgniteTxRollbackCheckedException(msg)); + } + + tx.topologyVersion(req.topologyVersion()); + } + } + else { fut = new GridDhtLockFuture(ctx, nearNode.id(), req.version(), @@ -970,10 +1016,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (!ctx.mvcc().addFuture(fut)) throw new IllegalStateException("Duplicate future ID: " + fut); } - finally { - if (top != null) - top.readUnlock(); - } + } + finally { + if (top != null) + top.readUnlock(); } boolean timedOut = false; @@ -1022,88 +1068,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Handle implicit locks for pessimistic transactions. if (req.inTx()) { - if (tx == null) { - GridDhtPartitionTopology top = null; - - if (req.firstClientRequest()) { - assert CU.clientNode(nearNode); - - top = topology(); - - top.readLock(); - - if (!top.topologyVersionFuture().isDone()) { - top.readUnlock(); - - return null; - } - } - - try { - if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) { - if (log.isDebugEnabled()) { - log.debug("Client topology version mismatch, need remap lock request [" + - "reqTopVer=" + req.topologyVersion() + - ", locTopVer=" + top.readyTopologyVersion() + - ", req=" + req + ']'); - } - - GridNearLockResponse res = sendClientLockRemapResponse(nearNode, - req, - top.lastTopologyChangeVersion()); - - return new GridFinishedFuture<>(res); - } - - tx = new GridDhtTxLocal( - ctx.shared(), - req.topologyVersion(), - nearNode.id(), - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - /*implicitTx*/false, - /*implicitSingleTx*/false, - ctx.systemTx(), - false, - ctx.ioPolicy(), - PESSIMISTIC, - req.isolation(), - req.timeout(), - req.isInvalidate(), - !req.skipStore(), - false, - req.txSize(), - null, - req.subjectId(), - req.taskNameHash()); - - if (req.syncCommit()) - tx.syncMode(FULL_SYNC); - - tx = ctx.tm().onCreated(null, tx); - - if (tx == null || !tx.init()) { - String msg = "Failed to acquire lock (transaction has been completed): " + - req.version(); - - U.warn(log, msg); - - if (tx != null) - tx.rollbackDhtLocal(); - - return new GridDhtFinishedFuture<>(new IgniteTxRollbackCheckedException(msg)); - } - - tx.topologyVersion(req.topologyVersion()); - } - finally { - if (top != null) - top.readUnlock(); - } - } - ctx.tm().txContext(tx); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4c46b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index c82b4b9..10c5f37 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -45,6 +45,7 @@ import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -504,6 +505,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac checkData(map, null, cache, 4); } + /** * @throws Exception If failed. */ @@ -928,6 +930,87 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } /** + * @return Cache configuration. + */ + private CacheConfiguration testPessimisticTx3Cfg() { + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(0); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTx3() throws Exception { + for (int iter = 0; iter < 5; iter++) { + info("Iteration: " + iter); + + ccfg = testPessimisticTx3Cfg(); + + IgniteEx ignite0 = startGrid(0); + + Map<Integer, Integer> map = new HashMap<>(); + + final IgniteCache<Integer, Integer> cache0 = ignite0.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 10000; i++) { + cache0.put(i, i); + map.put(i, i + 1); + } + + client = true; + + ccfg = testPessimisticTx3Cfg(); + + final Ignite ignite3 = startGrid(3); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(DEFAULT_CACHE_NAME); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + + IgniteInternalFuture putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + return null; + } + }); + + spi.waitForBlocked(); + + client = false; + + ccfg = testPessimisticTx3Cfg(); + + startGrid(1); + + // Want provoke case when client req is processed when target partition is RENTING, + // there is no easy way to do it, so just try sleep. + U.sleep(ThreadLocalRandom.current().nextInt(1000) + 100); + + spi.stopBlock(); + + putFut.get(); + + stopAllGrids(); + } + } + + /** * @throws Exception If failed. */ public void testOptimisticSerializableTx() throws Exception { @@ -2004,6 +2087,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + notifyAll(); + return; } } @@ -2072,6 +2157,16 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac blockedMsgs.clear(); } } + + /** + * @throws InterruptedException If interrupted. + */ + public void waitForBlocked() throws InterruptedException { + synchronized (this) { + while (blockedMsgs.isEmpty()) + wait(); + } + } } /**