Repository: ignite Updated Branches: refs/heads/master fe8c8cc58 -> 0e3404281
IGNITE-10080 Optimized Cache 6 long-running tests - Fixes #5243. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e340428 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e340428 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e340428 Branch: refs/heads/master Commit: 0e3404281666668741fb8600d6e9021cccabe6d3 Parents: fe8c8cc Author: pereslegin-pa <[email protected]> Authored: Wed Nov 28 15:51:15 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Nov 28 15:51:15 2018 +0300 ---------------------------------------------------------------------- .../distributed/CacheExchangeMergeTest.java | 4 +- ...ptimisticTxSuspendResumeMultiServerTest.java | 30 --- .../IgniteOptimisticTxSuspendResumeTest.java | 228 +++++++++++++------ ...OptimisticPrepareOnUnstableTopologyTest.java | 164 ++++++------- .../transactions/TxRollbackOnTimeoutTest.java | 3 +- .../ignite/testframework/GridTestUtils.java | 6 +- .../testsuites/IgniteCacheTestSuite6.java | 2 - 7 files changed, 238 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 03ea539..2dad0b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -419,7 +419,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void concurrentStart(final boolean withClients) throws Exception { - for (int i = 0; i < 5; i++) { + int iterations = GridTestUtils.SF.applyLB(5, 1); + + for (int i = 0; i < iterations; i++) { log.info("Iteration: " + i); startGrid(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java deleted file mode 100644 index b7003d4..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -/** - * - */ -public class IgniteOptimisticTxSuspendResumeMultiServerTest extends IgniteOptimisticTxSuspendResumeTest { - /** - * @return Number of server nodes. - */ - @Override protected int serversNumber() { - return 4; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java index 66c204d..73a7dc1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java @@ -19,17 +19,21 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.PA; @@ -62,7 +66,13 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest private static final int FUT_TIMEOUT = 5000; /** */ - private boolean client = false; + private static final int CLIENT_CNT = 2; + + /** */ + private static final int SERVER_CNT = 4; + + /** */ + private static final int GRID_CNT = CLIENT_CNT + SERVER_CNT; /** * List of closures to execute transaction operation that prohibited in suspended state. @@ -109,6 +119,10 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + int idx = getTestIgniteInstanceIndex(igniteInstanceName); + + boolean client = idx >= SERVER_CNT && idx < GRID_CNT; + cfg.setClientMode(client); return cfg; @@ -118,16 +132,21 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - startGrids(serversNumber()); + startGridsMultiThreaded(gridCount()); + } - if (serversNumber() > 1) { - client = true; + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); - startGrid(serversNumber()); + Ignite client = ignite(gridCount() - 1); - startGrid(serversNumber() + 1); + assertTrue(client.cluster().localNode().isClient()); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + grid(0).createCache(ccfg); - client = false; + client.createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); } awaitPartitionMapExchange(); @@ -138,11 +157,19 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest stopAllGrids(true); } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (CacheConfiguration ccfg : cacheConfigurations()) + ignite(0).destroyCache(ccfg.getName()); + + super.afterTest(); + } + /** * @return Number of server nodes. */ - protected int serversNumber() { - return 1; + protected int gridCount() { + return GRID_CNT; } /** @@ -215,8 +242,8 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() { @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception { for (TransactionIsolation isolation : TransactionIsolation.values()) { - final IgniteCache<Integer, Integer> otherCache = - ignite.getOrCreateCache(cacheConfiguration(PARTITIONED, 0, false).setName("otherCache")); + final IgniteCache<Integer, Integer> otherCache = ignite.getOrCreateCache( + cacheConfiguration("otherCache", PARTITIONED, 0, false)); final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); @@ -435,10 +462,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest tx.suspend(); - long start = U.currentTimeMillis(); - - while(TX_TIMEOUT >= U.currentTimeMillis() - start) - Thread.sleep(TX_TIMEOUT * 2); + U.sleep(TX_TIMEOUT * 2); GridTestUtils.assertThrowsWithCause(new Callable<Object>() { @Override public Object call() throws Exception { @@ -475,10 +499,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest cache.put(1, 1); - long start = U.currentTimeMillis(); - - while(TX_TIMEOUT >= U.currentTimeMillis() - start) - Thread.sleep(TX_TIMEOUT * 2); + U.sleep(TX_TIMEOUT * 2); GridTestUtils.assertThrowsWithCause(new Callable<Object>() { @Override public Object call() throws Exception { @@ -599,33 +620,92 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest * @throws Exception If failed. */ public void testSuspendTxAndResumeAfterTopologyChange() throws Exception { - executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() { - @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception { + Ignite srv = ignite(ThreadLocalRandom.current().nextInt(SERVER_CNT)); + Ignite client = ignite(SERVER_CNT); + Ignite clientNear = ignite(SERVER_CNT + 1); + + Map<String, List<List<Integer>>> cacheKeys = generateKeys(srv, TransactionIsolation.values().length); + + doCheckSuspendTxAndResume(srv, cacheKeys); + doCheckSuspendTxAndResume(client, cacheKeys); + doCheckSuspendTxAndResume(clientNear, cacheKeys); + } + + /** + * @param node Ignite isntance. + * @param cacheKeys Different key types mapped to cache name. + * @throws Exception If failed. + */ + private void doCheckSuspendTxAndResume(Ignite node, Map<String, List<List<Integer>>> cacheKeys) throws Exception { + ClusterNode locNode = node.cluster().localNode(); + + log.info("Run test for node [node=" + locNode.id() + ", client=" + locNode.isClient() + ']'); + + Map<IgniteCache<Integer, Integer>, Map<Transaction, Integer>> cacheTxMap = new IdentityHashMap<>(); + + for (Map.Entry<String, List<List<Integer>>> cacheKeysEntry : cacheKeys.entrySet()) { + String cacheName = cacheKeysEntry.getKey(); + + IgniteCache<Integer, Integer> cache = node.cache(cacheName); + + Map<Transaction, Integer> suspendedTxs = new IdentityHashMap<>(); + + for (List<Integer> keysList : cacheKeysEntry.getValue()) { for (TransactionIsolation isolation : TransactionIsolation.values()) { - Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + Transaction tx = node.transactions().txStart(OPTIMISTIC, isolation); - cache.put(1, 1); + int key = keysList.get(isolation.ordinal()); + + cache.put(key, key); tx.suspend(); - assertEquals(SUSPENDED, tx.state()); + suspendedTxs.put(tx, key); - try (IgniteEx g = startGrid(serversNumber() + 3)) { - tx.resume(); + String msg = "node=" + node.cluster().localNode() + + ", cache=" + cacheName + ", isolation=" + isolation + ", key=" + key; - assertEquals(ACTIVE, tx.state()); + assertEquals(msg, SUSPENDED, tx.state()); + } + } - assertEquals(1, (int)cache.get(1)); + cacheTxMap.put(cache, suspendedTxs); + } - tx.commit(); + int newNodeIdx = gridCount(); - assertEquals(1, (int)cache.get(1)); - } + startGrid(newNodeIdx); - cache.removeAll(); + try { + for (Map.Entry<IgniteCache<Integer, Integer>, Map<Transaction, Integer>> entry : cacheTxMap.entrySet()) { + IgniteCache<Integer, Integer> cache = entry.getKey(); + + for (Map.Entry<Transaction, Integer> suspendedTx : entry.getValue().entrySet()) { + Transaction tx = suspendedTx.getKey(); + + Integer key = suspendedTx.getValue(); + + tx.resume(); + + String msg = "node=" + node.cluster().localNode() + + ", cache=" + cache.getName() + ", isolation=" + tx.isolation() + ", key=" + key; + + assertEquals(msg, ACTIVE, tx.state()); + + assertEquals(msg, key, cache.get(key)); + + tx.commit(); + + assertEquals(msg, key, cache.get(key)); } } - }); + } + finally { + stopGrid(newNodeIdx); + + for (IgniteCache<Integer, Integer> cache : cacheTxMap.keySet()) + cache.removeAll(); + } } /** @@ -666,10 +746,10 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>(); - cfgs.add(cacheConfiguration(PARTITIONED, 0, false)); - cfgs.add(cacheConfiguration(PARTITIONED, 1, false)); - cfgs.add(cacheConfiguration(PARTITIONED, 1, true)); - cfgs.add(cacheConfiguration(REPLICATED, 0, false)); + cfgs.add(cacheConfiguration("cache1", PARTITIONED, 0, false)); + cfgs.add(cacheConfiguration("cache2", PARTITIONED, 1, false)); + cfgs.add(cacheConfiguration("cache3", PARTITIONED, 1, true)); + cfgs.add(cacheConfiguration("cache4", REPLICATED, 0, false)); return cfgs; } @@ -681,10 +761,11 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest * @return Cache configuration. */ private CacheConfiguration<Integer, Integer> cacheConfiguration( + String name, CacheMode cacheMode, int backups, boolean nearCache) { - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(name); ccfg.setCacheMode(cacheMode); ccfg.setAtomicityMode(TRANSACTIONAL); @@ -701,37 +782,56 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest /** * @param c Closure. - * @throws Exception If failed. */ - private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) throws Exception { - for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { - ignite(0).createCache(ccfg); + private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) { + for (int i = 0; i < gridCount(); i++) { + Ignite ignite = ignite(i); - log.info("Run test for cache [cache=" + ccfg.getCacheMode() + - ", backups=" + ccfg.getBackups() + - ", near=" + (ccfg.getNearConfiguration() != null) + "]"); + ClusterNode locNode = ignite.cluster().localNode(); - awaitPartitionMapExchange(); + log.info("Run test for node [node=" + locNode.id() + ", client=" + locNode.isClient() + ']'); - int srvNum = serversNumber(); - if (serversNumber() > 1) { - ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); - srvNum += 2; - } + for (CacheConfiguration ccfg : cacheConfigurations()) + c.apply(ignite, ignite.cache(ccfg.getName())); + } + } - try { - for (int i = 0; i < srvNum; i++) { - Ignite ignite = ignite(i); + /** + * Generates list of keys (primary, backup and neither primary nor backup). + * + * @param ignite Ignite instance. + * @param keysCnt The number of keys generated for each type of key. + * @return List of different keys mapped to cache name. + */ + private Map<String, List<List<Integer>>> generateKeys(Ignite ignite, int keysCnt) { + Map<String, List<List<Integer>>> cacheKeys = new HashMap<>(); - log.info("Run test for node [node=" + i + ", client=" + ignite.configuration().isClientMode() + ']'); + for (CacheConfiguration cfg : cacheConfigurations()) { + String cacheName = cfg.getName(); - c.apply(ignite, ignite.<Integer, Integer>cache(ccfg.getName())); - } - } - finally { - ignite(0).destroyCache(ccfg.getName()); + IgniteCache cache = ignite.cache(cacheName); + + List<List<Integer>> keys = new ArrayList<>(); + + // Generate different keys: 0 - primary, 1 - backup, 2 - neither primary nor backup. + for (int type = 0; type < 3; type++) { + if (type == 1 && cfg.getCacheMode() == PARTITIONED && cfg.getBackups() == 0) + continue; + + if (type == 2 && cfg.getCacheMode() == REPLICATED) + continue; + + List<Integer> keys0 = findKeys(cache, keysCnt, type * 100_000, type); + + assertEquals(cacheName, keysCnt, keys0.size()); + + keys.add(keys0); } + + cacheKeys.put(cacheName, keys); } + + return cacheKeys; } /** @@ -750,7 +850,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest */ public abstract void applyx(E1 e1, E2 e2) throws Exception; - /** {@inheritdoc} */ + /** {@inheritDoc} */ @Override public void apply(E1 e1, E2 e2) { try { applyx(e1, e2); @@ -775,7 +875,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest */ public abstract void applyx(T o) throws Exception; - /** {@inheritdoc} */ + /** {@inheritDoc} */ @Override public void apply(T o) { try { applyx(o); @@ -797,7 +897,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest */ public abstract void runx() throws Exception; - /** {@inheritdoc} */ + /** {@inheritDoc} */ @Override public void run() { try { runx(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java index 21dcf90..cbdcffe 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticPrepareOnUnstableTopologyTest.java @@ -17,22 +17,23 @@ package org.apache.ignite.internal.processors.cache.transactions; -import java.util.ArrayList; -import java.util.Collection; import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -49,21 +50,17 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac /** */ public static final String CACHE_NAME = "part_cache"; - /** IP finder. */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** */ - private volatile boolean run = true; + private static final int STARTUP_DELAY = 500; /** */ - private boolean client; + private static final int GRID_CNT = 4; - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - assertEquals(0, G.allGrids().size()); - } + /** */ + private boolean client; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -110,58 +107,42 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac */ private void doPrepareOnUnstableTopology(int keys, boolean testClient, TransactionIsolation isolation, long timeout) throws Exception { - Collection<Thread> threads = new ArrayList<>(); - - try { - // Start grid 1. - IgniteEx grid1 = startGrid(0); - - assertFalse(grid1.configuration().isClientMode()); - - threads.add(runCacheOperations(grid1, isolation, timeout, keys)); - - TimeUnit.SECONDS.sleep(3L); - - client = testClient; // If test client start on node in client mode. - - // Start grid 2. - IgniteEx grid2 = startGrid(1); - - assertEquals((Object)testClient, grid2.configuration().isClientMode()); + GridCompoundFuture<Void, Object> compFut = new GridCompoundFuture<>(); - client = false; + AtomicBoolean stopFlag = new AtomicBoolean(); - threads.add(runCacheOperations(grid2, isolation, timeout, keys)); - - TimeUnit.SECONDS.sleep(3L); - - // Start grid 3. - IgniteEx grid3 = startGrid(2); + try { + int clientIdx = testClient ? 1 : -1; - assertFalse(grid3.configuration().isClientMode()); + try { + for (int i = 0; i < GRID_CNT; i++) { + client = (clientIdx == i); - if (testClient) - log.info("Started client node: " + grid3.name()); + IgniteEx grid = startGrid(i); - threads.add(runCacheOperations(grid3, isolation, timeout, keys)); + assertEquals(client, grid.configuration().isClientMode().booleanValue()); - TimeUnit.SECONDS.sleep(3L); + client = false; - // Start grid 4. - IgniteEx grid4 = startGrid(3); + IgniteInternalFuture<Void> fut = runCacheOperationsAsync(grid, stopFlag, isolation, timeout, keys); - assertFalse(grid4.configuration().isClientMode()); + compFut.add(fut); - threads.add(runCacheOperations(grid4, isolation, timeout, keys)); + U.sleep(STARTUP_DELAY); + } + } + finally { + stopFlag.set(true); + } - TimeUnit.SECONDS.sleep(3L); + compFut.markInitialized(); - stopThreads(threads); + compFut.get(); - for (int i = 0; i < 4; i++) { + for (int i = 0; i < GRID_CNT; i++) { IgniteTxManager tm = ((IgniteKernal)grid(i)).internalCache(CACHE_NAME).context().tm(); - assertEquals("txMap is not empty:" + i, 0, tm.idMapSize()); + assertEquals("txMap is not empty: " + i, 0, tm.idMapSize()); } } finally { @@ -170,63 +151,50 @@ public class TxOptimisticPrepareOnUnstableTopologyTest extends GridCommonAbstrac } /** - * @param threads Thread which will be stopped. - */ - private void stopThreads(Iterable<Thread> threads) { - try { - run = false; - - for (Thread thread : threads) - thread.join(); - } - catch (Exception e) { - U.error(log(), "Couldn't stop threads.", e); - } - } - - /** * @param node Node. * @param isolation Isolation. * @param timeout Timeout. * @param keys Number of keys. - * @return Running thread. + * @return Future representing pending completion of the operation. */ - private Thread runCacheOperations(Ignite node, TransactionIsolation isolation, long timeout, final int keys) { - Thread t = new Thread() { - @Override public void run() { - while (run) { - TreeMap<Integer, String> vals = generateValues(keys); - - try { - try (Transaction tx = node.transactions().txStart(TransactionConcurrency.OPTIMISTIC, isolation, - timeout, keys)){ - - IgniteCache<Object, Object> cache = node.cache(CACHE_NAME); - - // Put or remove. - if (ThreadLocalRandom.current().nextDouble(1) < 0.65) - cache.putAll(vals); - else - cache.removeAll(vals.keySet()); - - tx.commit(); - } - catch (Exception e) { - U.error(log(), "Failed cache operation.", e); - } - - U.sleep(100); + private IgniteInternalFuture<Void> runCacheOperationsAsync( + Ignite node, + AtomicBoolean stopFlag, + TransactionIsolation isolation, + long timeout, + final int keys + ) { + return GridTestUtils.runAsync(() -> { + while (!stopFlag.get()) { + TreeMap<Integer, String> vals = generateValues(keys); + + try { + try (Transaction tx = node.transactions().txStart(TransactionConcurrency.OPTIMISTIC, isolation, + timeout, keys)) { + + IgniteCache<Object, Object> cache = node.cache(CACHE_NAME); + + // Put or remove. + if (ThreadLocalRandom.current().nextDouble(1) < 0.65) + cache.putAll(vals); + else + cache.removeAll(vals.keySet()); + + tx.commit(); } - catch (Exception e){ - U.error(log(), "Failed unlock.", e); + catch (Exception e) { + U.error(log(), "Failed cache operation.", e); } + + U.sleep(100); + } + catch (Exception e) { + U.error(log(), "Failed unlock.", e); } } - }; - - t.start(); - return t; + return null; + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index 177444d..61e39ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -56,6 +56,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.GridTestUtils.SF; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -78,7 +79,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; */ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { /** */ - private static final long DURATION = 60 * 1000L; + private static final long DURATION = SF.apply(60 * 1000); /** */ private static final long TX_MIN_TIMEOUT = 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index dad5344..ee25b7a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -2024,7 +2024,7 @@ public final class GridTestUtils { /** */ public static int apply(int val) { - return (int) (TEST_SCALE_FACTOR_VALUE * val); + return (int) Math.round(TEST_SCALE_FACTOR_VALUE * val); } /** */ @@ -2034,12 +2034,12 @@ public final class GridTestUtils { /** Apply scale factor with lower bound */ public static int applyLB(int val, int lowerBound) { - return Math.max((int) (TEST_SCALE_FACTOR_VALUE * val), lowerBound); + return Math.max(apply(val), lowerBound); } /** Apply scale factor with upper bound */ public static int applyUB(int val, int upperBound) { - return Math.min((int) (TEST_SCALE_FACTOR_VALUE * val), upperBound); + return Math.min(apply(val), upperBound); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e340428/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index 7bb476f..03cfb9f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMulti import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest; -import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest; import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest; import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest; @@ -64,7 +63,6 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(GridCachePartitionEvictionDuringReadThroughSelfTest.class); suite.addTestSuite(IgniteOptimisticTxSuspendResumeTest.class); - suite.addTestSuite(IgniteOptimisticTxSuspendResumeMultiServerTest.class); suite.addTestSuite(IgnitePessimisticTxSuspendResumeTest.class); suite.addTestSuite(CacheExchangeMergeTest.class);
