Repository: ignite
Updated Branches:
  refs/heads/master 0aaaab0d5 -> 33f485aec


IGNITE-9042 Fixed partial tranasaction state wheh transaction is timed out - 
Fixes #4397.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33f485ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33f485ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33f485ae

Branch: refs/heads/master
Commit: 33f485aecbca59e7ae776145df830565b0dd1ebd
Parents: 0aaaab0
Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Authored: Mon Jul 23 11:25:49 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Jul 23 11:25:49 2018 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java |   3 -
 ...WithSmallTimeoutAndContentionOneKeyTest.java | 255 +++++++++++++++++++
 .../junits/common/GridCommonAbstractTest.java   |  59 +++--
 .../testsuites/IgniteCacheTestSuite6.java       |   2 +
 4 files changed, 299 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 0beff6c..d02b851 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1292,9 +1292,6 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
             if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
                 continue;
 
-            if (tx.remainingTime() == -1)
-                return;
-
             MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, 
nearMapping);
 
             add(fut); // Append new future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java
new file mode 100644
index 0000000..93b995e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
+import 
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static 
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class TxWithSmallTimeoutAndContentionOneKeyTest extends 
GridCommonAbstractTest {
+    /** */
+    public static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int TIME_TO_EXECUTE = 30 * 1000;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setConsistentId("NODE_" + name.substring(name.length() - 1));
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+        );
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setBackups(3)
+        );
+
+        if (client){
+            cfg.setConsistentId("Client");
+
+            cfg.setClientMode(client);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @return Random transaction type.
+     */
+    protected TransactionConcurrency transactionConcurrency() {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        return random.nextBoolean() ? OPTIMISTIC : PESSIMISTIC;
+    }
+
+    /**
+     * @return Random transaction isolation level.
+     */
+    protected TransactionIsolation transactionIsolation(){
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        switch (random.nextInt(3)) {
+            case 0:
+                return READ_COMMITTED;
+            case 1:
+                return REPEATABLE_READ;
+            case 2:
+                return SERIALIZABLE;
+            default:
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * @return Random timeout.
+     */
+    protected long randomTimeOut() {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        return random.nextLong(5, 20);
+    }
+
+    /**
+     * https://issues.apache.org/jira/browse/IGNITE-9042
+     *
+     * @throws Exception If failed.
+     */
+    public void test() throws Exception {
+        startGrids(4);
+
+        client = true;
+
+        IgniteEx igClient = startGrid(4);
+
+        igClient.cluster().active(true);
+
+        AtomicBoolean stop = new AtomicBoolean(false);
+
+        IgniteCache<Integer, Long> cache = igClient.cache(DEFAULT_CACHE_NAME);
+
+        int threads = 1;
+
+        int keyId = 0;
+
+        CountDownLatch finishLatch = new CountDownLatch(threads);
+
+        AtomicLong cnt = new AtomicLong();
+
+        IgniteInternalFuture<Long> f = runMultiThreadedAsync(() -> {
+            IgniteTransactions txMgr = igClient.transactions();
+
+            while (!stop.get()) {
+                long newVal = cnt.getAndIncrement();
+
+                TransactionConcurrency concurrency = transactionConcurrency();
+
+                TransactionIsolation transactionIsolation = 
transactionIsolation();
+
+                try (Transaction tx = txMgr.txStart(concurrency, 
transactionIsolation, randomTimeOut(), 1)) {
+                    cache.put(keyId, newVal);
+
+                    tx.commit();
+                }
+                catch (Throwable e) {
+                  // Ignore.
+                }
+            }
+
+            finishLatch.countDown();
+
+        }, threads, "tx-runner");
+
+        runAsync(() -> {
+            try {
+                Thread.sleep(TIME_TO_EXECUTE);
+            }
+            catch (InterruptedException ignore) {
+                // Ignore.
+            }
+
+            stop.set(true);
+        });
+
+        finishLatch.await();
+
+        f.get();
+
+        IdleVerifyResultV2 idleVerifyResult = idleVerify(igClient, 
DEFAULT_CACHE_NAME);
+
+        log.info("Current counter value:" + cnt.get());
+
+        Long val = cache.get(keyId);
+
+        log.info("Last commited value:" + val);
+
+        if (idleVerifyResult.hasConflicts()){
+            SB sb = new SB();
+
+            sb.a("\n");
+
+            buildConflicts("Hash conflicts:\n", sb, 
idleVerifyResult.hashConflicts());
+            buildConflicts("Counters conflicts:\n", sb, 
idleVerifyResult.counterConflicts());
+
+            System.out.println(sb);
+
+            fail();
+        }
+    }
+
+    /**
+     * @param msg Header message.
+     * @param conflicts Conflicts map.
+     * @param sb String builder.
+     */
+    private void buildConflicts(String msg, SB sb, Map<PartitionKeyV2, 
List<PartitionHashRecordV2>> conflicts) {
+        sb.a(msg);
+
+        for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : 
conflicts.entrySet()) {
+            sb.a(entry.getKey()).a("\n");
+
+            for (PartitionHashRecordV2 rec : entry.getValue())
+                sb.a("\t").a(rec).a("\n");
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 967bdc1..33eae86 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -88,6 +88,7 @@ import 
org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
 import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
 import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
 import 
org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTask;
@@ -98,6 +99,11 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTask;
+import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
+import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskResult;
+import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -792,23 +798,6 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
     }
 
     /**
-     * Compares checksums between primary and backup partitions of specified 
caches.
-     * Works properly only on idle cluster - there may be false positive 
conflict reports if data in cluster is being
-     * concurrently updated.
-     *
-     * @param ig Ignite instance.
-     * @param cacheNames Cache names (if null, all user caches will be 
verified).
-     * @throws IgniteCheckedException If checksum conflict has been found.
-     */
-    protected void verifyBackupPartitions(Ignite ig, Set<String> cacheNames) 
throws IgniteCheckedException {
-        Map<PartitionKey, List<PartitionHashRecord>> conflicts = 
ig.compute().execute(
-            new VerifyBackupPartitionsTask(), cacheNames);
-
-        if (!conflicts.isEmpty())
-            throw new IgniteCheckedException("Conflict partitions: " + 
conflicts.keySet());
-    }
-
-    /**
      * @param top Topology.
      * @param topVer Version to wait for.
      * @throws Exception If failed.
@@ -1948,4 +1937,40 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
             dbMgr.waitForCheckpoint("test");
         }
     }
+
+    /**
+     * Compares checksums between primary and backup partitions of specified 
caches.
+     * Works properly only on idle cluster - there may be false positive 
conflict reports if data in cluster is being
+     * concurrently updated.
+     *
+     * @param ig Ignite instance.
+     * @param caches Cache names (if null, all user caches will be verified).
+     * @return Conflicts result.
+     * @throws IgniteException If none caches or node found.
+     */
+    protected IdleVerifyResultV2 idleVerify(Ignite ig, String... caches) {
+        IgniteEx ig0 = (IgniteEx)ig;
+
+        Set<String> cacheNames = new HashSet<>();
+
+        if (F.isEmpty(caches))
+            cacheNames.addAll(ig0.cacheNames());
+        else
+            Collections.addAll(cacheNames, caches);
+
+        if (cacheNames.isEmpty())
+            throw new IgniteException("None cache for checking.");
+
+        ClusterNode node = !ig0.localNode().isClient() ? ig0.localNode() : 
ig0.cluster().forServers().forRandom().node();
+
+        if (node == null)
+            throw new IgniteException("None server node for verification.");
+
+        VisorIdleVerifyTaskArg taskArg = new 
VisorIdleVerifyTaskArg(cacheNames);
+
+        return ig.compute().execute(
+            VisorIdleVerifyTaskV2.class.getName(),
+            new VisorTaskArgument<>(node.id(), taskArg, false)
+        );
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 66c1c48..77285be 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTime
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest;
+import 
org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest;
 
 /**
  * Test suite.
@@ -78,6 +79,7 @@ public class IgniteCacheTestSuite6 extends TestSuite {
         suite.addTestSuite(TxRollbackOnTimeoutTest.class);
         suite.addTestSuite(TxRollbackOnTimeoutNoDeadlockDetectionTest.class);
         suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class);
+        suite.addTestSuite(TxWithSmallTimeoutAndContentionOneKeyTest.class);
         suite.addTestSuite(IgniteCacheThreadLocalTxTest.class);
         suite.addTestSuite(TxRollbackAsyncTest.class);
         suite.addTestSuite(TxRollbackAsyncNearCacheTest.class);

Reply via email to