ignite-7049 Fixed error in tx timeout processing for optimistic/serializable tx


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd0d2ebf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd0d2ebf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd0d2ebf

Branch: refs/heads/ignite-zk
Commit: cd0d2ebf6a4294be851fb316f9b2f6b4ce6cb321
Parents: 9398813
Author: Aleksei Scherbakov <[email protected]>
Authored: Sat Dec 2 20:36:29 2017 +0300
Committer: sboikov <[email protected]>
Committed: Sat Dec 2 20:36:29 2017 +0300

----------------------------------------------------------------------
 ...arOptimisticSerializableTxPrepareFuture.java |   2 +-
 .../transactions/TxRollbackOnTimeoutTest.java   | 186 ++++++++++++++++++-
 2 files changed, 186 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cd0d2ebf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 1da0589..beb1e16 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -184,7 +184,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
             }
         }
 
-        if (e instanceof IgniteTxOptimisticCheckedException || e instanceof 
IgniteTxTimeoutCheckedException) {
+        if (e instanceof IgniteTxOptimisticCheckedException) {
             if (m != null)
                 tx.removeMapping(m.primary().id());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cd0d2ebf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index e1c6c10..6aa3bdd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -18,24 +18,31 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -44,8 +51,11 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionOptimisticException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.jsr166.LongAdder8;
 
+import static java.lang.Thread.sleep;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -56,6 +66,9 @@ import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
  */
 public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
     /** */
+    private static final long DURATION = 60 * 1000L;
+
+    /** */
     private static final long TX_MIN_TIMEOUT = 1;
 
     /** */
@@ -73,6 +86,8 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
         boolean client = "client".equals(igniteInstanceName);
 
         cfg.setClientMode(client);
@@ -373,6 +388,175 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test timeouts with random values and different tx configurations.
+     */
+    public void testRandomMixedTxConfigurations() throws Exception {
+        final Ignite client = startClient();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        final long seed = System.currentTimeMillis();
+
+        final Random r = new Random(seed);
+
+        log.info("Using seed: " + seed);
+
+        final int threadsCnt = Runtime.getRuntime().availableProcessors() * 2;
+
+        for (int k = 0; k < threadsCnt; k++)
+            grid(0).cache(CACHE_NAME).put(k, (long)0);
+
+        final TransactionConcurrency[] TC_VALS = 
TransactionConcurrency.values();
+        final TransactionIsolation[] TI_VALS = TransactionIsolation.values();
+
+        final LongAdder8 cntr0 = new LongAdder8();
+        final LongAdder8 cntr1 = new LongAdder8();
+        final LongAdder8 cntr2 = new LongAdder8();
+        final LongAdder8 cntr3 = new LongAdder8();
+
+        final IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                while (!stop.get()) {
+                    int nodeId = r.nextInt(GRID_CNT + 1);
+
+                    Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? 
client : grid(nodeId);
+
+                    TransactionConcurrency conc = 
TC_VALS[r.nextInt(TC_VALS.length)];
+                    TransactionIsolation isolation = 
TI_VALS[r.nextInt(TI_VALS.length)];
+
+                    int k = r.nextInt(threadsCnt);
+
+                    long timeout = r.nextInt(200) + 50;
+
+                    // Roughly 50% of transactions should time out.
+                    try (Transaction tx = node.transactions().txStart(conc, 
isolation, timeout, 1)) {
+                        cntr0.add(1);
+
+                        final Long v = (Long)node.cache(CACHE_NAME).get(k);
+
+                        final int delay = r.nextInt(400);
+
+                        if (delay > 0)
+                            sleep(delay);
+
+                        node.cache(CACHE_NAME).put(k, v + 1);
+
+                        tx.commit();
+
+                        cntr1.add(1);
+                    }
+                    catch (TransactionOptimisticException | 
InterruptedException e) {
+                        // Expected.
+                        cntr3.add(1);
+                    }
+                    catch (TransactionTimeoutException e) {
+                        cntr2.add(1);
+                    }
+                    catch (CacheException e) {
+                        assertEquals(TransactionTimeoutException.class, 
X.getCause(e).getClass());
+
+                        cntr2.add(1);
+                    }
+                }
+            }
+        }, threadsCnt, "tx-async-thread");
+
+        sleep(DURATION);
+
+        stop.set(true);
+
+        fut.get(10_000);
+
+        log.info("Tx test stats: started=" + cntr0.sum() +
+            ", completed=" + cntr1.sum() +
+            ", failed=" + cntr3.sum() +
+            ", timedOut=" + cntr2.sum());
+
+        assertEquals("Expected finished count same as started count", 
cntr0.sum(), cntr1.sum() + cntr2.sum() + cntr3.sum());
+    }
+
+    /**
+     * Tests timeout on DHT primary node for all tx configurations.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTimeoutOnPrimaryDHTNode() throws Exception {
+        final ClusterNode n0 = grid(0).affinity(CACHE_NAME).mapKeyToNode(0);
+
+        final Ignite prim = G.ignite(n0.id());
+
+        for (TransactionConcurrency concurrency : 
TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : 
TransactionIsolation.values())
+                testTimeoutOnPrimaryDhtNode0(prim, concurrency, isolation);
+        }
+    }
+
+    /**
+     *
+     * @param prim Primary node.
+     * @param conc Concurrency.
+     * @param isolation Isolation.
+
+     * @throws Exception If failed.
+     */
+    private void testTimeoutOnPrimaryDhtNode0(final Ignite prim, final 
TransactionConcurrency conc,
+        final TransactionIsolation isolation)
+        throws Exception {
+
+        log.info("concurrency=" + conc + ", isolation=" + isolation);
+
+        // Force timeout on primary DHT node by blocking DHT prepare response.
+        toggleBlocking(GridDhtTxPrepareResponse.class, prim, true);
+
+        final int val = 0;
+
+        try {
+            multithreaded(new Runnable() {
+                @Override public void run() {
+                    try (Transaction txOpt = prim.transactions().txStart(conc, 
isolation, 300, 1)) {
+
+                        prim.cache(CACHE_NAME).put(val, val);
+
+                        txOpt.commit();
+                    }
+                }
+            }, 1, "tx-async-thread");
+
+            fail();
+        }
+        catch (TransactionTimeoutException e) {
+            // Expected.
+        }
+
+        toggleBlocking(GridDhtTxPrepareResponse.class, prim, false);
+
+        AffinityTopologyVersion topVer = new AffinityTopologyVersion(GRID_CNT 
+ 1, 0);
+
+        for (Ignite ignite : G.allGrids())
+            
((IgniteEx)ignite).context().cache().context().partitionReleaseFuture(topVer).get(10_000);
+    }
+
+    /**
+     * @param cls Message class.
+     * @param nodeToBlock Node to block.
+     * @param block Block.
+     */
+    private void toggleBlocking(Class<? extends Message> cls, Ignite 
nodeToBlock, boolean block) {
+        for (Ignite ignite : G.allGrids()) {
+            if (ignite == nodeToBlock)
+                continue;
+
+            final TestRecordingCommunicationSpi spi =
+                
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+            if (block)
+                spi.blockMessages(cls, nodeToBlock.name());
+            else
+                spi.stopBlock(true);
+        }
+    }
+
+    /**
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @param op Operation to test.
@@ -652,4 +836,4 @@ public class TxRollbackOnTimeoutTest extends 
GridCommonAbstractTest {
 
         fut2.get();
     }
-}
\ No newline at end of file
+}

Reply via email to