ignite-6467 Fixed GridDhtInvalidPartitionException in lockAllAsync for client 
requests (tried to create cache entries before topology check)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d4c46b9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d4c46b9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d4c46b9

Branch: refs/heads/ignite-8446
Commit: 7d4c46b9bfd502712143d6e05d40546a5c3420fb
Parents: f13d2ed
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Jul 25 22:42:23 2018 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jul 25 22:42:23 2018 +0300

----------------------------------------------------------------------
 .../dht/GridDhtTransactionalCacheAdapter.java   | 176 ++++++++-----------
 ...niteCacheClientNodeChangingTopologyTest.java |  95 ++++++++++
 2 files changed, 165 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4c46b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index fe1aee9..161c542 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -917,39 +917,85 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
 
             GridDhtLockFuture fut = null;
 
-            if (!req.inTx()) {
-                GridDhtPartitionTopology top = null;
+            GridDhtPartitionTopology top = null;
 
-                if (req.firstClientRequest()) {
-                    assert CU.clientNode(nearNode);
+            if (req.firstClientRequest()) {
+                assert CU.clientNode(nearNode);
 
-                    top = topology();
+                top = topology();
 
-                    top.readLock();
+                top.readLock();
 
-                    if (!top.topologyVersionFuture().isDone()) {
-                        top.readUnlock();
+                if (!top.topologyVersionFuture().isDone()) {
+                    top.readUnlock();
 
-                        return null;
-                    }
+                    return null;
                 }
+            }
 
-                try {
-                    if (top != null && needRemap(req.topologyVersion(), 
top.readyTopologyVersion())) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Client topology version mismatch, need 
remap lock request [" +
+            try {
+                if (top != null && needRemap(req.topologyVersion(), 
top.readyTopologyVersion())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Client topology version mismatch, need 
remap lock request [" +
                                 "reqTopVer=" + req.topologyVersion() +
                                 ", locTopVer=" + top.readyTopologyVersion() +
                                 ", req=" + req + ']');
-                        }
+                    }
 
-                        GridNearLockResponse res = 
sendClientLockRemapResponse(nearNode,
-                            req,
-                            top.lastTopologyChangeVersion());
+                    GridNearLockResponse res = 
sendClientLockRemapResponse(nearNode,
+                        req,
+                        top.lastTopologyChangeVersion());
 
-                        return new GridFinishedFuture<>(res);
-                    }
+                    return new GridFinishedFuture<>(res);
+                }
+
+                if (req.inTx()) {
+                    if (tx == null) {
+                        tx = new GridDhtTxLocal(
+                            ctx.shared(),
+                            req.topologyVersion(),
+                            nearNode.id(),
+                            req.version(),
+                            req.futureId(),
+                            req.miniId(),
+                            req.threadId(),
+                            /*implicitTx*/false,
+                            /*implicitSingleTx*/false,
+                            ctx.systemTx(),
+                            false,
+                            ctx.ioPolicy(),
+                            PESSIMISTIC,
+                            req.isolation(),
+                            req.timeout(),
+                            req.isInvalidate(),
+                            !req.skipStore(),
+                            false,
+                            req.txSize(),
+                            null,
+                            req.subjectId(),
+                            req.taskNameHash());
+
+                        if (req.syncCommit())
+                            tx.syncMode(FULL_SYNC);
+
+                        tx = ctx.tm().onCreated(null, tx);
 
+                        if (tx == null || !tx.init()) {
+                            String msg = "Failed to acquire lock (transaction 
has been completed): " +
+                                    req.version();
+
+                            U.warn(log, msg);
+
+                            if (tx != null)
+                                tx.rollbackDhtLocal();
+
+                            return new GridDhtFinishedFuture<>(new 
IgniteTxRollbackCheckedException(msg));
+                        }
+
+                        tx.topologyVersion(req.topologyVersion());
+                    }
+                }
+                else {
                     fut = new GridDhtLockFuture(ctx,
                         nearNode.id(),
                         req.version(),
@@ -970,10 +1016,10 @@ public abstract class 
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                     if (!ctx.mvcc().addFuture(fut))
                         throw new IllegalStateException("Duplicate future ID: 
" + fut);
                 }
-                finally {
-                    if (top != null)
-                        top.readUnlock();
-                }
+            }
+            finally {
+                if (top != null)
+                    top.readUnlock();
             }
 
             boolean timedOut = false;
@@ -1022,88 +1068,6 @@ public abstract class 
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
             // Handle implicit locks for pessimistic transactions.
             if (req.inTx()) {
-                if (tx == null) {
-                    GridDhtPartitionTopology top = null;
-
-                    if (req.firstClientRequest()) {
-                        assert CU.clientNode(nearNode);
-
-                        top = topology();
-
-                        top.readLock();
-
-                        if (!top.topologyVersionFuture().isDone()) {
-                            top.readUnlock();
-
-                            return null;
-                        }
-                    }
-
-                    try {
-                        if (top != null && needRemap(req.topologyVersion(), 
top.readyTopologyVersion())) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Client topology version mismatch, 
need remap lock request [" +
-                                    "reqTopVer=" + req.topologyVersion() +
-                                    ", locTopVer=" + 
top.readyTopologyVersion() +
-                                    ", req=" + req + ']');
-                            }
-
-                            GridNearLockResponse res = 
sendClientLockRemapResponse(nearNode,
-                                req,
-                                top.lastTopologyChangeVersion());
-
-                            return new GridFinishedFuture<>(res);
-                        }
-
-                        tx = new GridDhtTxLocal(
-                            ctx.shared(),
-                            req.topologyVersion(),
-                            nearNode.id(),
-                            req.version(),
-                            req.futureId(),
-                            req.miniId(),
-                            req.threadId(),
-                            /*implicitTx*/false,
-                            /*implicitSingleTx*/false,
-                            ctx.systemTx(),
-                            false,
-                            ctx.ioPolicy(),
-                            PESSIMISTIC,
-                            req.isolation(),
-                            req.timeout(),
-                            req.isInvalidate(),
-                            !req.skipStore(),
-                            false,
-                            req.txSize(),
-                            null,
-                            req.subjectId(),
-                            req.taskNameHash());
-
-                        if (req.syncCommit())
-                            tx.syncMode(FULL_SYNC);
-
-                        tx = ctx.tm().onCreated(null, tx);
-
-                        if (tx == null || !tx.init()) {
-                            String msg = "Failed to acquire lock (transaction 
has been completed): " +
-                                req.version();
-
-                            U.warn(log, msg);
-
-                            if (tx != null)
-                                tx.rollbackDhtLocal();
-
-                            return new GridDhtFinishedFuture<>(new 
IgniteTxRollbackCheckedException(msg));
-                        }
-
-                        tx.topologyVersion(req.topologyVersion());
-                    }
-                    finally {
-                        if (top != null)
-                            top.readUnlock();
-                    }
-                }
-
                 ctx.tm().txContext(tx);
 
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4c46b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index c82b4b9..10c5f37 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -504,6 +505,7 @@ public class IgniteCacheClientNodeChangingTopologyTest 
extends GridCommonAbstrac
 
         checkData(map, null, cache, 4);
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -928,6 +930,87 @@ public class IgniteCacheClientNodeChangingTopologyTest 
extends GridCommonAbstrac
     }
 
     /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration testPessimisticTx3Cfg() {
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(0);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTx3() throws Exception {
+        for (int iter = 0; iter < 5; iter++) {
+            info("Iteration: " + iter);
+
+            ccfg = testPessimisticTx3Cfg();
+
+            IgniteEx ignite0 = startGrid(0);
+
+            Map<Integer, Integer> map = new HashMap<>();
+
+            final IgniteCache<Integer, Integer> cache0 = 
ignite0.cache(DEFAULT_CACHE_NAME);
+
+            for (int i = 0; i < 10000; i++) {
+                cache0.put(i, i);
+                map.put(i, i + 1);
+            }
+
+            client = true;
+
+            ccfg = testPessimisticTx3Cfg();
+
+            final Ignite ignite3 = startGrid(3);
+
+            final IgniteCache<Integer, Integer> cache = 
ignite3.cache(DEFAULT_CACHE_NAME);
+
+            TestCommunicationSpi spi = 
(TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+            spi.blockMessages(GridNearLockRequest.class, 
ignite0.localNode().id());
+
+            IgniteInternalFuture putFut = GridTestUtils.runAsync(new 
Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Thread.currentThread().setName("put-thread");
+
+                    try (Transaction tx = 
ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        cache.putAll(map);
+
+                        tx.commit();
+                    }
+
+                    return null;
+                }
+            });
+
+            spi.waitForBlocked();
+
+            client = false;
+
+            ccfg = testPessimisticTx3Cfg();
+
+            startGrid(1);
+
+            // Want provoke case when client req is processed when target 
partition is RENTING,
+            // there is no easy way to do it, so just try sleep.
+            U.sleep(ThreadLocalRandom.current().nextInt(1000) + 100);
+
+            spi.stopBlock();
+
+            putFut.get();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testOptimisticSerializableTx() throws Exception {
@@ -2004,6 +2087,8 @@ public class IgniteCacheClientNodeChangingTopologyTest 
extends GridCommonAbstrac
 
                         blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
 
+                        notifyAll();
+
                         return;
                     }
                 }
@@ -2072,6 +2157,16 @@ public class IgniteCacheClientNodeChangingTopologyTest 
extends GridCommonAbstrac
                 blockedMsgs.clear();
             }
         }
+
+        /**
+         * @throws InterruptedException If interrupted.
+         */
+        public void waitForBlocked() throws InterruptedException {
+            synchronized (this) {
+                while (blockedMsgs.isEmpty())
+                    wait();
+            }
+        }
     }
 
     /**

Reply via email to