ignite-1534 Fixed races in dynamic cache start exchange ordering.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b54cbd7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b54cbd7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b54cbd7 Branch: refs/heads/ignite-1272 Commit: 7b54cbd7499cd498b04e821dfa3b572bd94debec Parents: a411f94 Author: sboikov <sboi...@gridgain.com> Authored: Fri Oct 2 11:19:06 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Oct 2 11:19:06 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 2 +- .../cache/DynamicCacheDescriptor.java | 17 ++ .../processors/cache/GridCacheContext.java | 2 +- .../processors/cache/GridCacheMvccManager.java | 20 ++- .../GridCachePartitionExchangeManager.java | 72 ++------- .../processors/cache/GridCacheProcessor.java | 26 ++-- .../cache/distributed/dht/GridDhtGetFuture.java | 4 +- .../dht/GridPartitionedGetFuture.java | 5 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 9 +- .../distributed/near/GridNearGetFuture.java | 2 + .../cache/IgniteCachePutAllRestartTest.java | 4 +- .../CacheGetFutureHangsSelfTest.java | 156 +++++++++---------- .../distributed/IgniteCacheCreatePutTest.java | 125 +++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 5 + 14 files changed, 284 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index b694523..a6f5f08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -550,7 +550,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { gridStartTime = getSpi().getGridStartTime(); updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()), - new DiscoCache(localNode(), getSpi().getRemoteNodes())); + new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id())))); startLatch.countDown(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 24df7e4..b100a31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -68,6 +68,9 @@ public class DynamicCacheDescriptor { /** */ private AffinityTopologyVersion startTopVer; + /** */ + private boolean rcvdOnDiscovery; + /** * @param ctx Context. * @param cacheCfg Cache configuration. @@ -236,6 +239,20 @@ public class DynamicCacheDescriptor { this.updatesAllowed = updatesAllowed; } + /** + * @return {@code True} if received in discovery data. + */ + public boolean receivedOnDiscovery() { + return rcvdOnDiscovery; + } + + /** + * @param rcvdOnDiscovery {@code True} if received in discovery data. + */ + public void receivedOnDiscovery(boolean rcvdOnDiscovery) { + this.rcvdOnDiscovery = rcvdOnDiscovery; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName())); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 5385dec..3a1cee6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1848,7 +1848,7 @@ public class GridCacheContext<K, V> implements Externalizable { boolean deserializePortable, boolean cpy) { assert key != null; - assert val != null; + assert val != null || skipVals; if (!keepCacheObjects) { Object key0 = key.value(cacheObjCtx, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index dd51da2..0960c9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -391,13 +391,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @param futVer Future ID. * @param fut Future. + * @return {@code False} if future was forcibly completed with error. */ - public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { + public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; - onFutureAdded(fut); + return onFutureAdded(fut); } /** @@ -529,12 +530,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @param fut Future. + * @return {@code False} if future was forcibly completed with error. */ - private void onFutureAdded(IgniteInternalFuture<?> fut) { - if (stopping) + private boolean onFutureAdded(IgniteInternalFuture<?> fut) { + if (stopping) { ((GridFutureAdapter)fut).onDone(stopError()); - else if (cctx.kernalContext().clientDisconnected()) + + return false; + } + else if (cctx.kernalContext().clientDisconnected()) { ((GridFutureAdapter)fut).onDone(disconnectedError(null)); + + return false; + } + + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3e77e0d..adc2174 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -105,18 +105,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** Partition resend timeout after eviction. */ private final long partResendTimeout = getLong(IGNITE_PRELOAD_RESEND_TIMEOUT, DFLT_PRELOAD_RESEND_TIMEOUT); - /** Latch which completes after local exchange future is created. */ - private GridFutureAdapter<?> locExchFut; - /** */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); /** Last partition refresh. */ private final AtomicLong lastRefresh = new AtomicLong(-1); - /** Pending futures. */ - private final Queue<GridDhtPartitionsExchangeFuture> pendingExchangeFuts = new ConcurrentLinkedQueue<>(); - /** */ @GridToStringInclude private ExchangeWorker exchWorker; @@ -229,31 +223,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (exchId != null) { - // Start exchange process. - pendingExchangeFuts.add(exchFut); + if (log.isDebugEnabled()) + log.debug("Discovery event (will start exchange): " + exchId); // Event callback - without this callback future will never complete. exchFut.onEvent(exchId, e); + // Start exchange process. + addFuture(exchFut); + } + else { if (log.isDebugEnabled()) - log.debug("Discovery event (will start exchange): " + exchId); - - locExchFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - if (!enterBusy()) - return; - - try { - // Unwind in the order of discovery events. - for (GridDhtPartitionsExchangeFuture f = pendingExchangeFuts.poll(); f != null; - f = pendingExchangeFuts.poll()) - addFuture(f); - } - finally { - leaveBusy(); - } - } - }); + log.debug("Do not start exchange for discovery event: " + evt); } } finally { @@ -266,8 +247,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void start0() throws IgniteCheckedException { super.start0(); - locExchFut = new GridFutureAdapter<>(); - exchWorker = new ExchangeWorker(); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, @@ -328,12 +307,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); - new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); - - onDiscoveryEvent(cctx.localNodeId(), fut); + exchWorker.futQ.addFirst(fut); - // Allow discovery events to get processed. - locExchFut.onDone(); + new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); if (reconnect) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @@ -382,8 +358,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.preloader().onInitialExchangeComplete(null); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.startTopologyVersion() == null) + cacheCtx.preloader().onInitialExchangeComplete(null); + } if (log.isDebugEnabled()) log.debug("Finished waiting for initial exchange: " + fut.exchangeId()); @@ -414,12 +392,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (AffinityReadyFuture f : readyFuts.values()) f.onDone(stopErr); - for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts) - f.onDone(stopErr); - - if (locExchFut != null) - locExchFut.onDone(stopErr); - U.cancel(exchWorker); if (log.isDebugEnabled()) @@ -583,22 +555,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param nodeId New node ID. - * @param fut Exchange future. - */ - void onDiscoveryEvent(UUID nodeId, GridDhtPartitionsExchangeFuture fut) { - if (!enterBusy()) - return; - - try { - addFuture(fut); - } - finally { - leaveBusy(); - } - } - - /** * @param evt Discovery event. * @return Affinity topology version. */ @@ -1033,7 +989,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Pending exchange futures:"); - for (GridDhtPartitionsExchangeFuture fut : pendingExchangeFuts) + for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ) U.warn(log, ">>> " + fut); ExchangeFutureSet exchFuts = this.exchFuts; http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 6c13399..daa4475 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -805,7 +805,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean loc = desc.locallyConfigured(); - if (loc || CU.affinityNode(locNode, filter)) { + if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); CachePluginManager pluginMgr = desc.pluginManager(); @@ -1958,7 +1958,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (req.initiatingNodeId() == null) desc.staticallyConfigured(true); - registeredCaches.put(maskNull(req.cacheName()), desc); + desc.receivedOnDiscovery(true); + + DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc); + + assert old == null : old; ctx.discovery().setCacheFilter( req.cacheName(), @@ -2474,10 +2478,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } else { - if (req.clientStartOnly()) { - assert req.initiatingNodeId() != null : req; + assert req.initiatingNodeId() != null : req; + + // Cache already exists, exchange is needed only if client cache should be created. + ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); - needExchange = ctx.discovery().addClientNode(req.cacheName(), + boolean clientReq = node != null && + !ctx.discovery().cacheAffinityNode(node, req.cacheName()); + + if (req.clientStartOnly()) { + needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), req.initiatingNodeId(), req.nearCacheConfiguration() != null); } @@ -2488,12 +2498,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { "(a cache with the same name is already started): " + U.maskName(req.cacheName()))); } else { - // Cache already exists, exchange is needed only if client cache should be created. - ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); - - boolean clientReq = node != null && - !ctx.discovery().cacheAffinityNode(node, req.cacheName()); - needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(), req.initiatingNodeId(), req.nearCacheConfiguration() != null); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 76aaf72..a67b1de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -447,8 +447,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (v == null) it.remove(); - else if (!skipVals) - info.value((CacheObject)v); + else + info.value(skipVals ? null : (CacheObject)v); } return infos; http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 0202c53..abbe7b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -587,8 +587,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (keysSize != 0) { Map<K, V> map = new GridLeanMap<>(keysSize); - for (GridCacheEntryInfo info : infos) + for (GridCacheEntryInfo info : infos) { + assert skipVals == (info.value() == null); + cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false); + } return map; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index fb2c5ad..41df53a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -825,8 +825,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> futVer = cctx.versions().next(topVer); - if (storeFuture()) - cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this); + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { + assert isDone() : GridNearAtomicUpdateFuture.this; + + return; + } + } // Assign version on near node in CLOCK ordering mode even if fastMap is false. if (updVer == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index a7875f6..d9763f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -703,6 +703,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma CacheObject val = info.value(); KeyCacheObject key = info.key(); + assert skipVals == (info.value() == null); + cctx.addResult(map, key, val, skipVals, false, deserializePortable, false); } catch (GridCacheEntryRemovedException ignore) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java index fc14085..ae99926 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java @@ -163,12 +163,12 @@ public class IgniteCachePutAllRestartTest extends GridCommonAbstractTest { info("Running iteration on the node [idx=" + node + ", nodeId=" + ignite.cluster().localNode().id() + ']'); + final IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME); + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { Thread.currentThread().setName("put-thread"); - IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME); - Random rnd = new Random(); long endTime = System.currentTimeMillis() + 60_000; http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java index 8e8447e..e8622aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java @@ -18,23 +18,21 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; @@ -45,22 +43,14 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { /** Grid count. */ private static final int GRID_CNT = 8; - /** Grids. */ - private static Ignite[] grids; + /** */ + private AtomicReferenceArray<Ignite> nodes; - /** Ids. */ - private static String[] ids; - - /** Flags. */ - private static AtomicBoolean[] flags; - - /** Futs. */ - private static Collection<IgniteInternalFuture> futs; - - /** Alive grids. */ - private static Set<Integer> aliveGrids; + /** */ + private volatile boolean done; /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -81,17 +71,27 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + /** * @throws Exception If failed. */ - public void testFailover() throws Exception { - int cnt = 10; + public void testContainsKeyFailover() throws Exception { + int cnt = 3; for (int i = 0; i < cnt; i++) { try { - U.debug("*** Iteration " + (i + 1) + '/' + cnt); - - init(); + log.info("Iteration: " + (i + 1) + '/' + cnt); doTestFailover(); } @@ -102,54 +102,34 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { } /** - * Initializes test. - */ - private void init() { - grids = new Ignite[GRID_CNT + 1]; - - ids = new String[GRID_CNT + 1]; - - aliveGrids = new HashSet<>(); - - flags = new AtomicBoolean[GRID_CNT + 1]; - - futs = new ArrayList<>(); - } - - /** * Executes one test iteration. + * @throws Exception If failed. */ private void doTestFailover() throws Exception { try { - for (int i = 0; i < GRID_CNT + 1; i++) { - final IgniteEx grid = startGrid(i); + done = false; - grids[i] = grid; + nodes = new AtomicReferenceArray<>(GRID_CNT); - ids[i] = grid.localNode().id().toString(); + startGridsMultiThreaded(GRID_CNT, false); - aliveGrids.add(i); + for (int i = 0; i < GRID_CNT ; i++) + assertTrue(nodes.compareAndSet(i, null, ignite(i))); - flags[i] = new AtomicBoolean(); - } + List<IgniteInternalFuture> futs = new ArrayList<>(); for (int i = 0; i < GRID_CNT + 1; i++) { - final int gridIdx = i; - futs.add(multithreadedAsync(new Runnable() { @Override public void run() { - IgniteCache cache = grids[gridIdx].cache(null); + T2<Ignite, Integer> ignite; - while (!flags[gridIdx].get()) { - int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1); + while ((ignite = randomNode()) != null) { + IgniteCache<Object, Object> cache = ignite.get1().cache(null); - String id = ids[idx]; + for (int i = 0; i < 100; i++) + cache.containsKey(ThreadLocalRandom.current().nextInt(100_000)); - if (id != null /*&& grids[gridIdx] != null*/) { - //U.debug("!!! Grid containsKey start " + gridIdx); - cache.containsKey(id); - //U.debug("!!! Grid containsKey finished " + gridIdx); - } + assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1())); try { Thread.sleep(ThreadLocalRandom.current().nextLong(50)); @@ -163,18 +143,15 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { futs.add(multithreadedAsync(new Runnable() { @Override public void run() { - IgniteCache cache = grids[gridIdx].cache(null); + T2<Ignite, Integer> ignite; - while (!flags[gridIdx].get()) { - int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1); + while ((ignite = randomNode()) != null) { + IgniteCache<Object, Object> cache = ignite.get1().cache(null); - String id = ids[idx]; + for (int i = 0; i < 100; i++) + cache.put(ThreadLocalRandom.current().nextInt(100_000), UUID.randomUUID()); - if (id != null /*&& grids[gridIdx] != null*/) { - //U.debug("!!! Grid put start " + gridIdx); - cache.put(id, UUID.randomUUID()); - //U.debug("!!! Grid put finished " + gridIdx); - } + assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1())); try { Thread.sleep(ThreadLocalRandom.current().nextLong(50)); @@ -187,35 +164,50 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { }, 1, "put-thread-" + i)); } - while (aliveGrids.size() > 1) { - final int gridToKill = ThreadLocalRandom.current().nextInt(GRID_CNT) + 1; + try { + int aliveGrids = GRID_CNT; - if (gridToKill > 0 && grids[gridToKill] != null) { - U.debug("!!! Trying to kill grid " + gridToKill); + while (aliveGrids > 0) { + T2<Ignite, Integer> ignite = randomNode(); - //synchronized (mons[gridToKill]) { - U.debug("!!! Grid stop start " + gridToKill); + assert ignite != null; - grids[gridToKill].close(); + Ignite ignite0 = ignite.get1(); - aliveGrids.remove(gridToKill); + log.info("Stop node: " + ignite0.name()); - grids[gridToKill] = null; + ignite0.close(); - flags[gridToKill].set(true); + log.info("Node stop finished: " + ignite0.name()); - U.debug("!!! Grid stop finished " + gridToKill); - //} + aliveGrids--; } } + finally { + done = true; + } - Thread.sleep(ThreadLocalRandom.current().nextLong(100)); + for (IgniteInternalFuture fut : futs) + fut.get(); } finally { - flags[0].set(true); + done = true; + } + } - for (IgniteInternalFuture fut : futs) - fut.get(); + /** + * @return Random node and its index. + */ + @Nullable private T2<Ignite, Integer> randomNode() { + while (!done) { + int idx = ThreadLocalRandom.current().nextInt(GRID_CNT); + + Ignite ignite = nodes.get(idx); + + if (ignite != null && nodes.compareAndSet(idx, ignite, null)) + return new T2<>(ignite, idx); } + + return null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java new file mode 100644 index 0000000..8b3d9d3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class IgniteCacheCreatePutTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + OptimizedMarshaller marsh = new OptimizedMarshaller(); + marsh.setRequireSerializable(false); + + cfg.setMarshaller(marsh); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName("cache*"); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000L; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testStartNodes() throws Exception { + long stopTime = System.currentTimeMillis() + 2 * 60_000; + + try { + int iter = 0; + + while (System.currentTimeMillis() < stopTime) { + log.info("Iteration: " + iter++); + + try { + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override + public Void call() throws Exception { + int node = idx.getAndIncrement(); + + Ignite ignite = startGrid(node); + + IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache1"); + + assertNotNull(cache); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + return null; + } + }, GRID_CNT, "start"); + } + finally { + stopAllGrids(); + } + } + } + finally { + stopAllGrids(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index f8c9d26..b89bffd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -77,8 +77,10 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest; import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest; @@ -205,6 +207,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class); suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class); + suite.addTestSuite(IgniteCacheCreatePutTest.class); suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class); @@ -278,6 +281,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CrossCacheLockTest.class); suite.addTestSuite(IgniteCrossCacheTxSelfTest.class); + suite.addTestSuite(CacheGetFutureHangsSelfTest.class); + return suite; } } \ No newline at end of file