Repository: ignite Updated Branches: refs/heads/ignite-843 316b9421a -> 852772cf8
Added test. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/67699564 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/67699564 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/67699564 Branch: refs/heads/ignite-843 Commit: 67699564d594647236552450d362be1e04c3d476 Parents: 3aa9ea3 Author: sboikov <[email protected]> Authored: Wed Sep 2 15:25:15 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 2 15:34:57 2015 +0300 ---------------------------------------------------------------------- .../CachePutAllFailoverAbstractTest.java | 110 ++++++++++++++----- ...gniteCachePutRetryTransactionalSelfTest.java | 42 ------- .../junits/common/GridCommonAbstractTest.java | 41 +++++++ 3 files changed, 123 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/67699564/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePutAllFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePutAllFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePutAllFailoverAbstractTest.java index 62fddda..f558ba0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePutAllFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePutAllFailoverAbstractTest.java @@ -17,7 +17,15 @@ package org.apache.ignite.internal.processors.cache.distributed; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; @@ -29,14 +37,6 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.NotNull; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; - import static org.apache.ignite.cache.CacheMode.PARTITIONED; /** @@ -85,21 +85,31 @@ public abstract class CachePutAllFailoverAbstractTest extends GridCacheAbstractS * @throws Exception If failed. */ public void testPutAllFailover() throws Exception { - testPutAllFailover(false); + testPutAllFailover(Test.PUT_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverPessimisticTx() throws Exception { + if (atomicityMode() == CacheAtomicityMode.ATOMIC) + return; + + testPutAllFailover(Test.PUT_ALL_PESSIMISTIC_TX); } /** * @throws Exception If failed. */ public void testPutAllFailoverAsync() throws Exception { - testPutAllFailover(true); + testPutAllFailover(Test.PUT_ALL_ASYNC); } /** - * @param async If {@code true} tests asynchronous operation. + * @param test Test type * @throws Exception If failed. */ - private void testPutAllFailover(final boolean async) throws Exception { + private void testPutAllFailover(final Test test) throws Exception { final AtomicBoolean finished = new AtomicBoolean(); final long endTime = System.currentTimeMillis() + TEST_TIME; @@ -123,7 +133,7 @@ public abstract class CachePutAllFailoverAbstractTest extends GridCacheAbstractS try { IgniteCache<TestKey, TestValue> cache0 = ignite(0).cache(null); - final IgniteCache<TestKey, TestValue> cache = async ? cache0.withAsync() : cache0; + final IgniteCache<TestKey, TestValue> cache = test == Test.PUT_ALL_ASYNC ? cache0.withAsync() : cache0; GridTestUtils.runMultiThreaded(new Callable<Object>() { @Override public Object call() throws Exception { @@ -142,10 +152,8 @@ public abstract class CachePutAllFailoverAbstractTest extends GridCacheAbstractS lastInfo = time; } - if (async) { - Collection<IgniteFuture<?>> futs = new ArrayList<>(); - - for (int i = 0 ; i < 10; i++) { + switch (test) { + case PUT_ALL: { TreeMap<TestKey, TestValue> map = new TreeMap<>(); for (int k = 0; k < 100; k++) @@ -153,23 +161,55 @@ public abstract class CachePutAllFailoverAbstractTest extends GridCacheAbstractS cache.putAll(map); - IgniteFuture<?> fut = cache.future(); + break; + } + + case PUT_ALL_ASYNC: { + Collection<IgniteFuture<?>> futs = new ArrayList<>(); - assertNotNull(fut); + for (int i = 0 ; i < 10; i++) { + TreeMap<TestKey, TestValue> map = new TreeMap<>(); - futs.add(fut); + for (int k = 0; k < 100; k++) + map.put(new TestKey(rnd.nextInt(200)), new TestValue(iter)); + + cache.putAll(map); + + IgniteFuture<?> fut = cache.future(); + + assertNotNull(fut); + + futs.add(fut); + } + + for (IgniteFuture<?> fut : futs) + fut.get(); + + break; } - for (IgniteFuture<?> fut : futs) - fut.get(); - } - else { - TreeMap<TestKey, TestValue> map = new TreeMap<>(); + case PUT_ALL_PESSIMISTIC_TX: { + final TreeMap<TestKey, TestValue> map = new TreeMap<>(); + + for (int k = 0; k < 100; k++) + map.put(new TestKey(rnd.nextInt(200)), new TestValue(iter)); + + doInTransaction(ignite(0), new Callable<Object>() { + @Override public Object call() throws Exception { + for (TestKey key : map.keySet()) + cache.get(key); + + cache.putAll(map); - for (int k = 0; k < 100; k++) - map.put(new TestKey(rnd.nextInt(200)), new TestValue(iter)); + return null; + } + }); - cache.putAll(map); + break; + } + + default: + assert false; } iter++; @@ -277,4 +317,18 @@ public abstract class CachePutAllFailoverAbstractTest extends GridCacheAbstractS return S.toString(TestValue.class, this); } } + + /** + * + */ + private enum Test { + /** */ + PUT_ALL, + + /** */ + PUT_ALL_ASYNC, + + /** */ + PUT_ALL_PESSIMISTIC_TX + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/67699564/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 997848b..7c66efc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -24,7 +24,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; -import javax.cache.CacheException; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; @@ -32,18 +31,12 @@ import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryProcessor; -import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionRollbackException; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * @@ -171,41 +164,6 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } /** - * @param ignite Ignite instance. - * @param clo Closure. - * @return Result of closure execution. - * @throws Exception If failed. - */ - private <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { - while (true) { - try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - T res = clo.call(); - - tx.commit(); - - return res; - } - catch (CacheException e) { - if (e.getCause() instanceof ClusterTopologyException) { - ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); - - topEx.retryReadyFuture().get(); - } - else - throw e; - } - catch (ClusterTopologyException e) { - IgniteFuture<?> fut = e.retryReadyFuture(); - - fut.get(); - } - catch (TransactionRollbackException ignore) { - // Safe to retry right away. - } - } - } - - /** * Callable to process inside transaction. */ private static class ProcessCallable implements Callable<Void> { http://git-wip-us.apache.org/repos/asf/ignite/blob/67699564/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 4d7e923..13ec665 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; @@ -47,6 +48,7 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.configuration.CacheConfiguration; @@ -74,12 +76,16 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.junits.GridAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Super class for all common tests. @@ -986,4 +992,39 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { idx++; } } + + /** + * @param ignite Ignite instance. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception If failed. + */ + protected <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.call(); + + tx.commit(); + + return res; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + IgniteFuture<?> fut = e.retryReadyFuture(); + + fut.get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } } \ No newline at end of file
