Repository: ignite Updated Branches: refs/heads/master 0aaaab0d5 -> 33f485aec
IGNITE-9042 Fixed partial tranasaction state wheh transaction is timed out - Fixes #4397. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33f485ae Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33f485ae Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33f485ae Branch: refs/heads/master Commit: 33f485aecbca59e7ae776145df830565b0dd1ebd Parents: 0aaaab0 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Mon Jul 23 11:25:49 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Jul 23 11:25:49 2018 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 3 - ...WithSmallTimeoutAndContentionOneKeyTest.java | 255 +++++++++++++++++++ .../junits/common/GridCommonAbstractTest.java | 59 +++-- .../testsuites/IgniteCacheTestSuite6.java | 2 + 4 files changed, 299 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 0beff6c..d02b851 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1292,9 +1292,6 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) continue; - if (tx.remainingTime() == -1) - return; - MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); add(fut); // Append new future. http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java new file mode 100644 index 0000000..93b995e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.processors.cache.verify.PartitionKey; +import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class TxWithSmallTimeoutAndContentionOneKeyTest extends GridCommonAbstractTest { + /** */ + public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int TIME_TO_EXECUTE = 30 * 1000; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setConsistentId("NODE_" + name.substring(name.length() - 1)); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + cfg.setCacheConfiguration( + new CacheConfiguration(DEFAULT_CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(3) + ); + + if (client){ + cfg.setConsistentId("Client"); + + cfg.setClientMode(client); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @return Random transaction type. + */ + protected TransactionConcurrency transactionConcurrency() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + return random.nextBoolean() ? OPTIMISTIC : PESSIMISTIC; + } + + /** + * @return Random transaction isolation level. + */ + protected TransactionIsolation transactionIsolation(){ + ThreadLocalRandom random = ThreadLocalRandom.current(); + + switch (random.nextInt(3)) { + case 0: + return READ_COMMITTED; + case 1: + return REPEATABLE_READ; + case 2: + return SERIALIZABLE; + default: + throw new UnsupportedOperationException(); + } + } + + /** + * @return Random timeout. + */ + protected long randomTimeOut() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + return random.nextLong(5, 20); + } + + /** + * https://issues.apache.org/jira/browse/IGNITE-9042 + * + * @throws Exception If failed. + */ + public void test() throws Exception { + startGrids(4); + + client = true; + + IgniteEx igClient = startGrid(4); + + igClient.cluster().active(true); + + AtomicBoolean stop = new AtomicBoolean(false); + + IgniteCache<Integer, Long> cache = igClient.cache(DEFAULT_CACHE_NAME); + + int threads = 1; + + int keyId = 0; + + CountDownLatch finishLatch = new CountDownLatch(threads); + + AtomicLong cnt = new AtomicLong(); + + IgniteInternalFuture<Long> f = runMultiThreadedAsync(() -> { + IgniteTransactions txMgr = igClient.transactions(); + + while (!stop.get()) { + long newVal = cnt.getAndIncrement(); + + TransactionConcurrency concurrency = transactionConcurrency(); + + TransactionIsolation transactionIsolation = transactionIsolation(); + + try (Transaction tx = txMgr.txStart(concurrency, transactionIsolation, randomTimeOut(), 1)) { + cache.put(keyId, newVal); + + tx.commit(); + } + catch (Throwable e) { + // Ignore. + } + } + + finishLatch.countDown(); + + }, threads, "tx-runner"); + + runAsync(() -> { + try { + Thread.sleep(TIME_TO_EXECUTE); + } + catch (InterruptedException ignore) { + // Ignore. + } + + stop.set(true); + }); + + finishLatch.await(); + + f.get(); + + IdleVerifyResultV2 idleVerifyResult = idleVerify(igClient, DEFAULT_CACHE_NAME); + + log.info("Current counter value:" + cnt.get()); + + Long val = cache.get(keyId); + + log.info("Last commited value:" + val); + + if (idleVerifyResult.hasConflicts()){ + SB sb = new SB(); + + sb.a("\n"); + + buildConflicts("Hash conflicts:\n", sb, idleVerifyResult.hashConflicts()); + buildConflicts("Counters conflicts:\n", sb, idleVerifyResult.counterConflicts()); + + System.out.println(sb); + + fail(); + } + } + + /** + * @param msg Header message. + * @param conflicts Conflicts map. + * @param sb String builder. + */ + private void buildConflicts(String msg, SB sb, Map<PartitionKeyV2, List<PartitionHashRecordV2>> conflicts) { + sb.a(msg); + + for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : conflicts.entrySet()) { + sb.a(entry.getKey()).a("\n"); + + for (PartitionHashRecordV2 rec : entry.getValue()) + sb.a("\t").a(rec).a("\n"); + } + + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 967bdc1..33eae86 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord; import org.apache.ignite.internal.processors.cache.verify.PartitionKey; import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTask; @@ -98,6 +99,11 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTask; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskResult; +import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; @@ -792,23 +798,6 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } /** - * Compares checksums between primary and backup partitions of specified caches. - * Works properly only on idle cluster - there may be false positive conflict reports if data in cluster is being - * concurrently updated. - * - * @param ig Ignite instance. - * @param cacheNames Cache names (if null, all user caches will be verified). - * @throws IgniteCheckedException If checksum conflict has been found. - */ - protected void verifyBackupPartitions(Ignite ig, Set<String> cacheNames) throws IgniteCheckedException { - Map<PartitionKey, List<PartitionHashRecord>> conflicts = ig.compute().execute( - new VerifyBackupPartitionsTask(), cacheNames); - - if (!conflicts.isEmpty()) - throw new IgniteCheckedException("Conflict partitions: " + conflicts.keySet()); - } - - /** * @param top Topology. * @param topVer Version to wait for. * @throws Exception If failed. @@ -1948,4 +1937,40 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { dbMgr.waitForCheckpoint("test"); } } + + /** + * Compares checksums between primary and backup partitions of specified caches. + * Works properly only on idle cluster - there may be false positive conflict reports if data in cluster is being + * concurrently updated. + * + * @param ig Ignite instance. + * @param caches Cache names (if null, all user caches will be verified). + * @return Conflicts result. + * @throws IgniteException If none caches or node found. + */ + protected IdleVerifyResultV2 idleVerify(Ignite ig, String... caches) { + IgniteEx ig0 = (IgniteEx)ig; + + Set<String> cacheNames = new HashSet<>(); + + if (F.isEmpty(caches)) + cacheNames.addAll(ig0.cacheNames()); + else + Collections.addAll(cacheNames, caches); + + if (cacheNames.isEmpty()) + throw new IgniteException("None cache for checking."); + + ClusterNode node = !ig0.localNode().isClient() ? ig0.localNode() : ig0.cluster().forServers().forRandom().node(); + + if (node == null) + throw new IgniteException("None server node for verification."); + + VisorIdleVerifyTaskArg taskArg = new VisorIdleVerifyTaskArg(cacheNames); + + return ig.compute().execute( + VisorIdleVerifyTaskV2.class.getName(), + new VisorTaskArgument<>(node.id(), taskArg, false) + ); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/33f485ae/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index 66c1c48..77285be 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTime import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest; +import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest; /** * Test suite. @@ -78,6 +79,7 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(TxRollbackOnTimeoutTest.class); suite.addTestSuite(TxRollbackOnTimeoutNoDeadlockDetectionTest.class); suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class); + suite.addTestSuite(TxWithSmallTimeoutAndContentionOneKeyTest.class); suite.addTestSuite(IgniteCacheThreadLocalTxTest.class); suite.addTestSuite(TxRollbackAsyncTest.class); suite.addTestSuite(TxRollbackAsyncNearCacheTest.class);