ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/316dcc69 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/316dcc69 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/316dcc69 Branch: refs/heads/ignite-5578 Commit: 316dcc693bf285eab7793b4bb7e79e0509f3d897 Parents: b3f4407 Author: sboikov <[email protected]> Authored: Thu Jul 27 16:07:51 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jul 27 17:11:45 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 5 ++ .../processors/cache/GridCacheProcessor.java | 4 + .../dht/atomic/GridDhtAtomicCache.java | 65 +++++++--------- .../GridDhtPartitionsExchangeFuture.java | 60 +++++--------- .../cache/transactions/IgniteTxHandler.java | 82 ++++++++++++++++++-- .../closure/GridClosureProcessor.java | 22 ++++-- .../org/apache/ignite/thread/IgniteThread.java | 4 + .../distributed/CacheExchangeMergeTest.java | 31 +++++--- 8 files changed, 173 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c0e6a11..fc08630 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1882,6 +1882,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana break; } + if (evt.type() == EVT_NODE_JOINED && cctx.cache().receivedCachesFromNodeJoin(node)) { + log.info("Stop merge, received caches from node: " + node); + + break; + } log.info("Merge exchange future [curFut=" + curFut.initialVersion() + ", mergedFut=" + fut.initialVersion() + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 88fa9f1..5a99bbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1759,6 +1759,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + public boolean receivedCachesFromNodeJoin(ClusterNode node) { + return !cachesInfo.cachesReceivedFromJoin(node.id()).isEmpty(); + } + /** * Starts statically configured caches received from remote nodes during exchange. * http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 75d060f..93d22aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1763,8 +1763,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); - GridDhtTopologyFuture topFut = null; - GridDhtAtomicAbstractUpdateFuture dhtFut = null; IgniteCacheExpiryPolicy expiry = null; @@ -1794,24 +1792,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return true; } - topFut = top.topologyVersionFuture(); + GridDhtTopologyFuture topFut = top.topologyVersionFuture(); - if (topFut.isDone()) { - topFut = null; + if (!topFut.isDone()) + return false; // Will wait at the beginning of next updateAllAsyncInternal0 call. - // Do not check topology version if topology was locked on near node by - // external transaction or explicit lock. - if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { - DhtAtomicUpdateResult updRes = update(node, locked, req, res); + // Do not check topology version if topology was locked on near node by + // external transaction or explicit lock. + if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { + DhtAtomicUpdateResult updRes = update(node, locked, req, res); - dhtFut = updRes.dhtFuture(); - deleted = updRes.deleted(); - expiry = updRes.expiryPolicy(); - } - else - // Should remap all keys. - res.remapTopologyVersion(top.topologyVersion()); + dhtFut = updRes.dhtFuture(); + deleted = updRes.deleted(); + expiry = updRes.expiryPolicy(); } + else + // Should remap all keys. + res.remapTopologyVersion(top.topologyVersion()); } finally { top.readUnlock(); @@ -1865,26 +1862,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.shared().database().checkpointReadUnlock(); } - if (topFut == null) { - if (res.remapTopologyVersion() != null) { - assert dhtFut == null; + if (res.remapTopologyVersion() != null) { + assert dhtFut == null; - completionCb.apply(req, res); - } - else { - if (dhtFut != null) - dhtFut.map(node, res.returnValue(), res, completionCb); - } + completionCb.apply(req, res); + } + else { + if (dhtFut != null) + dhtFut.map(node, res.returnValue(), res, completionCb); + } - if (req.writeSynchronizationMode() != FULL_ASYNC) - req.cleanup(!node.isLocal()); + if (req.writeSynchronizationMode() != FULL_ASYNC) + req.cleanup(!node.isLocal()); - sendTtlUpdateRequest(expiry); + sendTtlUpdateRequest(expiry); - return true; - } - else - return waitForTopologyFuture(node, req, completionCb); + return true; } /** @@ -1902,12 +1895,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Thread curThread = Thread.currentThread(); if (curThread instanceof IgniteThread) { - IgniteThread thread = (IgniteThread)curThread; + final IgniteThread thread = (IgniteThread)curThread; - if (thread.policy() == GridIoPolicy.SYSTEM_POOL) { + if (thread.hasStripeOrPolicy()) { topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - ctx.closures().runLocalSafe(new Runnable() { + ctx.closures().runLocalWithThreadPolicy(thread, new Runnable() { @Override public void run() { updateAllAsyncInternal0(node, req, completionCb); } @@ -3307,7 +3300,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (NodeStoppingException e){ - U.error(log, "Failed to update key on backup (local node is stopping):" + key, e); + U.warn(log, "Failed to update key on backup (local node is stopping): " + key); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index f749833..ebd305a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -563,8 +563,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte initCachesOnLocalJoin(); } - // TODO IGNITE-5578: caches from joining nodes for merged exchanges. - if (exchCtx.mergeExchanges()) { if (localJoinExchange()) { if (cctx.kernalContext().clientNode()) { @@ -575,14 +573,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else { onServerNodeEvent(crdNode); - exchange = ExchangeType.ALL_2; + exchange = ExchangeType.ALL; } } else { if (CU.clientNode(discoEvt.eventNode())) exchange = onClientNodeEvent(crdNode); else - exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL_2; + exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; } } else { @@ -609,12 +607,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte break; } - case ALL_2: { - distributedExchange2(); - - break; - } - case CLIENT: { if (!exchCtx.mergeExchanges()) initTopologies(); @@ -933,22 +925,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @throws IgniteCheckedException If failed. */ - private void distributedExchange2() throws IgniteCheckedException { - waitPartitionRelease(); - - if (state == ExchangeLocalState.CRD) { - if (remaining.isEmpty()) - onAllReceived(); - } - else - sendPartitions(crd); - - initDone(); - } - - /** - * @throws IgniteCheckedException If failed. - */ private void distributedExchange() throws IgniteCheckedException { assert crd != null; @@ -980,13 +956,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal() || cacheGroupStopping(grp.groupId())) - continue; + if (!exchCtx.mergeExchanges()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; - // It is possible affinity is not initialized yet if node joins to cluster. - if (grp.affinity().lastVersion().topologyVersion() > 0) - grp.topology().beforeExchange(this, !centralizedAff); + // It is possible affinity is not initialized yet if node joins to cluster. + if (grp.affinity().lastVersion().topologyVersion() > 0) + grp.topology().beforeExchange(this, !centralizedAff); + } } cctx.database().beforeExchange(this); @@ -2276,10 +2254,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - // TODO IGNITE-5578: process all merged events. - if (discoEvt.type() == EVT_NODE_JOINED) - assignPartitionsStates(); - else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { + if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { assert discoEvt instanceof DiscoveryCustomEvent; if (activateCluster()) @@ -2294,8 +2269,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } } - else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED) - detectLostPartitions(); + else { + if (exchCtx.events().serverJoin()) + assignPartitionsStates(); + + if (exchCtx.events().serverLeft()) + detectLostPartitions(); + } updateLastVersion(cctx.versions().last()); @@ -2417,6 +2397,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param finishState State. * @param msg Request. * @param nodeId Node ID. */ @@ -3218,9 +3199,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ALL, /** */ - ALL_2, - - /** */ NONE } http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 2ed8ed7..54cfc2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -25,6 +25,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -42,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; @@ -74,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFutureCancelledException; +import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; @@ -109,10 +112,10 @@ public class IgniteTxHandler { private GridCacheSharedContext<?, ?> ctx; /** - * @param nearNodeId Node ID. + * @param nearNodeId Sender node ID. * @param req Request. */ - private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) { + private void processNearTxPrepareRequest(UUID nearNodeId, GridNearTxPrepareRequest req) { if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() + ", node=" + nearNodeId + ']'); @@ -130,7 +133,29 @@ public class IgniteTxHandler { return; } - IgniteInternalFuture<GridNearTxPrepareResponse> fut = prepareNearTx(nearNode, req); + processNearTxPrepareRequest0(nearNode, req); + } + + /** + * @param nearNode Sender node. + * @param req Request. + */ + private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) { + IgniteInternalFuture<GridNearTxPrepareResponse> fut; + + if (req.firstClientRequest()) { + for (;;) { + if (waitForExchangeFuture(nearNode, req)) + return; + + fut = prepareNearTx(nearNode, req); + + if (fut != null) + break; + } + } + else + fut = prepareNearTx(nearNode, req); assert req.txState() != null || fut == null || fut.error() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null); @@ -303,9 +328,9 @@ public class IgniteTxHandler { /** * @param nearNode Node that initiated transaction. * @param req Near prepare request. - * @return Prepare future. + * @return Prepare future or {@code null} if need retry operation. */ - private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx( + @Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx( final ClusterNode nearNode, final GridNearTxPrepareRequest req ) { @@ -348,6 +373,11 @@ public class IgniteTxHandler { top = firstEntry.context().topology(); top.readLock(); + + GridDhtTopologyFuture topFut = top.topologyVersionFuture(); + + if (!topFut.isDone()) + return null; } try { @@ -499,6 +529,48 @@ public class IgniteTxHandler { } /** + * @param node Sender node. + * @param req Request. + * @return {@code True} if update will be retried from future listener. + */ + private boolean waitForExchangeFuture(final ClusterNode node, final GridNearTxPrepareRequest req) { + assert req.firstClientRequest() : req; + + GridDhtTopologyFuture topFut = ctx.exchange().lastTopologyFuture(); + + if (!topFut.isDone()) { + Thread curThread = Thread.currentThread(); + + if (curThread instanceof IgniteThread) { + final IgniteThread thread = (IgniteThread)curThread; + + if (thread.hasStripeOrPolicy()) { + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() { + @Override public void run() { + processNearTxPrepareRequest0(node, req); + } + }); + } + }); + + return true; + } + } + + try { + topFut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Topology future failed: " + e, e); + } + } + + return false; + } + + /** * @param expVer Expected topology version. * @param curVer Current topology version. * @param req Request. http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 01207e3..201815c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -69,6 +69,7 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.resources.LoadBalancerResource; +import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -779,13 +780,22 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** - * @param c Closure to execute. - * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. - * @return Future. - * @throws IgniteCheckedException Thrown in case of any errors. + * @param thread Thread. + * @param c Closure. */ - private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException { - return runLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL); + public void runLocalWithThreadPolicy(IgniteThread thread, Runnable c) { + assert thread.stripe() >= 0 || thread.policy() != GridIoPolicy.UNDEFINED : thread; + + if (thread.stripe() >= 0) + ctx.getStripedExecutorService().execute(thread.stripe(), c); + else { + try { + ctx.pools().poolForPolicy(thread.policy()).execute(c); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get pool for policy: " + thread.policy(), e); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java index c814625..f0d39fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java @@ -125,6 +125,10 @@ public class IgniteThread extends Thread { return stripe; } + public boolean hasStripeOrPolicy() { + return stripe >= 0 || plc != GridIoPolicy.UNDEFINED; + } + /** * Gets name of the Ignite instance this thread belongs to. * http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 011a01d..16bac3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -59,7 +59,9 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** @@ -76,7 +78,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { private boolean testSpi; /** */ - private static String[] cacheNames = {"c1"/*, "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"*/}; + private static String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"}; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -96,16 +98,16 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } cfg.setCacheConfiguration( - cacheConfiguration("c1", ATOMIC, PARTITIONED, 0) -// cacheConfiguration("c2", ATOMIC, PARTITIONED, 1), -// cacheConfiguration("c3", ATOMIC, PARTITIONED, 2), -// cacheConfiguration("c4", ATOMIC, PARTITIONED, 10), -// cacheConfiguration("c5", ATOMIC, REPLICATED, 0), -// cacheConfiguration("c6", TRANSACTIONAL, PARTITIONED, 0), -// cacheConfiguration("c7", TRANSACTIONAL, PARTITIONED, 1), -// cacheConfiguration("c8", TRANSACTIONAL, PARTITIONED, 2), -// cacheConfiguration("c9", TRANSACTIONAL, PARTITIONED, 10), -// cacheConfiguration("c10", TRANSACTIONAL, REPLICATED, 0) + cacheConfiguration("c1", ATOMIC, PARTITIONED, 0), + cacheConfiguration("c2", ATOMIC, PARTITIONED, 1), + cacheConfiguration("c3", ATOMIC, PARTITIONED, 2), + cacheConfiguration("c4", ATOMIC, PARTITIONED, 10), + cacheConfiguration("c5", ATOMIC, REPLICATED, 0), + cacheConfiguration("c6", TRANSACTIONAL, PARTITIONED, 0), + cacheConfiguration("c7", TRANSACTIONAL, PARTITIONED, 1), + cacheConfiguration("c8", TRANSACTIONAL, PARTITIONED, 2), + cacheConfiguration("c9", TRANSACTIONAL, PARTITIONED, 10), + cacheConfiguration("c10", TRANSACTIONAL, REPLICATED, 0) ); return cfg; @@ -121,10 +123,15 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @param name Cache name. * @param atomicityMode Cache atomicity mode. + * @param cacheMode Cache mode. * @param backups Number of backups. * @return Cache configuration. */ - private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode, CacheMode cacheMode, int backups) { + private CacheConfiguration cacheConfiguration(String name, + CacheAtomicityMode atomicityMode, + CacheMode cacheMode, + int backups) + { CacheConfiguration ccfg = new CacheConfiguration(name); ccfg.setAtomicityMode(atomicityMode);
