Fixed hang in data structures failover tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/41fe4695 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/41fe4695 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/41fe4695 Branch: refs/heads/ignite-1270 Commit: 41fe46953af80304e05c76007df3c30b8d681742 Parents: e9f22ab Author: sboikov <[email protected]> Authored: Fri Nov 27 12:33:14 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 27 12:33:14 2015 +0300 ---------------------------------------------------------------------- .../CacheDataStructuresManager.java | 45 ++++++++++++-------- .../datastructures/DataStructuresProcessor.java | 20 +-------- ...gniteAtomicLongChangingTopologySelfTest.java | 30 ++++++++++--- 3 files changed, 53 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/41fe4695/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 431be1e..ec787f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -383,7 +383,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { if (create) { hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated); - GridCacheSetHeader old = retryPutIfAbsent(cache, key, hdr); + GridCacheSetHeader old = (GridCacheSetHeader)cache.getAndPutIfAbsent(key, hdr); if (old != null) hdr = old; @@ -493,6 +493,12 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { continue; } + else if (!pingNodes(nodes)) { + if (log.isDebugEnabled()) + log.debug("RemoveSetData job failed and set data node left, will retry: " + e); + + continue; + } else throw e; } @@ -510,6 +516,12 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { continue; } + else if (!pingNodes(nodes)) { + if (log.isDebugEnabled()) + log.debug("RemoveSetData job failed and set data node left, will retry: " + e); + + continue; + } else throw e; } @@ -526,6 +538,20 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } /** + * @param nodes Nodes to ping. + * @return {@code True} if was able to ping all nodes. + * @throws IgniteCheckedException If failed/ + */ + private boolean pingNodes(Collection<ClusterNode> nodes) throws IgniteCheckedException { + for (ClusterNode node : nodes) { + if (!cctx.discovery().pingNode(node.id())) + return false; + } + + return true; + } + + /** * @param key Set item key. * @param rmv {@code True} if item was removed. */ @@ -562,23 +588,6 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { /** * @param cache Cache. - * @param key Key. - * @param val Value. - * @throws IgniteCheckedException If failed. - * @return Previous value. - */ - @SuppressWarnings("unchecked") - @Nullable private <T> T retryPutIfAbsent(final IgniteInternalCache cache, final Object key, final T val) - throws IgniteCheckedException { - return DataStructuresProcessor.retry(log, new Callable<T>() { - @Nullable @Override public T call() throws Exception { - return (T)cache.getAndPutIfAbsent(key, val); - } - }); - } - - /** - * @param cache Cache. * @param keys Keys to remove. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/41fe4695/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 23d64cf..998bd92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -973,7 +973,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { IgniteOutClosureX<GridCacheQueueHeader> rmv = new IgniteOutClosureX<GridCacheQueueHeader>() { @Override public GridCacheQueueHeader applyx() throws IgniteCheckedException { - return (GridCacheQueueHeader)retryRemove(cctx.cache(), new GridCacheQueueHeaderKey(name)); + return (GridCacheQueueHeader)cctx.cache().getAndRemove(new GridCacheQueueHeaderKey(name)); } }; @@ -1569,7 +1569,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { IgniteOutClosureX<GridCacheSetHeader> rmv = new IgniteOutClosureX<GridCacheSetHeader>() { @Override public GridCacheSetHeader applyx() throws IgniteCheckedException { - return (GridCacheSetHeader)retryRemove(cctx.cache(), new GridCacheSetHeaderKey(name)); + return (GridCacheSetHeader)cctx.cache().getAndRemove(new GridCacheSetHeaderKey(name)); } }; @@ -1583,22 +1583,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** - * @param cache Cache. - * @param key Key to remove. - * @throws IgniteCheckedException If failed. - * @return Removed value. - */ - @SuppressWarnings("unchecked") - @Nullable private <T> T retryRemove(final IgniteInternalCache cache, final Object key) - throws IgniteCheckedException { - return retry(log, new Callable<T>() { - @Nullable @Override public T call() throws Exception { - return (T)cache.getAndRemove(key); - } - }); - } - - /** * @param log Logger. * @param call Callable. * @return Callable result. http://git-wip-us.apache.org/repos/asf/ignite/blob/41fe4695/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java index c00557d..a8d5801 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteSet; import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CollectionConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -56,7 +57,7 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract /** Grid count. */ private static final int GRID_CNT = 5; - /** Restart cound. */ + /** Restart count. */ private static final int RESTART_CNT = 15; /** Atomic long name. */ @@ -133,8 +134,6 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract * @throws Exception If failed. */ public void testClientAtomicLongCreateCloseFailover() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1732"); - testFailoverWithClient(new IgniteInClosure<Ignite>() { @Override public void apply(Ignite ignite) { for (int i = 0; i < 100; i++) { @@ -150,8 +149,6 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract * @throws Exception If failed. */ public void testClientQueueCreateCloseFailover() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1976"); - testFailoverWithClient(new IgniteInClosure<Ignite>() { @Override public void apply(Ignite ignite) { for (int i = 0; i < 100; i++) { @@ -170,6 +167,27 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract } /** + * @throws Exception If failed. + */ + public void testClientSetCreateCloseFailover() throws Exception { + testFailoverWithClient(new IgniteInClosure<Ignite>() { + @Override public void apply(Ignite ignite) { + for (int i = 0; i < 100; i++) { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setBackups(1); + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(i % 2 == 0 ? TRANSACTIONAL : ATOMIC); + + IgniteSet set = ignite.set("set-" + i, colCfg); + + set.close(); + } + } + }); + } + + /** * @param c Test iteration closure. * @throws Exception If failed. */ @@ -188,7 +206,7 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract IgniteInternalFuture<?> fut = restartThread(finished); - long stop = System.currentTimeMillis() + 30_000; + long stop = System.currentTimeMillis() + 60_000; try { int iter = 0;
