ignite-5578

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/316dcc69
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/316dcc69
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/316dcc69

Branch: refs/heads/ignite-5578
Commit: 316dcc693bf285eab7793b4bb7e79e0509f3d897
Parents: b3f4407
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Jul 27 16:07:51 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Jul 27 17:11:45 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  5 ++
 .../processors/cache/GridCacheProcessor.java    |  4 +
 .../dht/atomic/GridDhtAtomicCache.java          | 65 +++++++---------
 .../GridDhtPartitionsExchangeFuture.java        | 60 +++++---------
 .../cache/transactions/IgniteTxHandler.java     | 82 ++++++++++++++++++--
 .../closure/GridClosureProcessor.java           | 22 ++++--
 .../org/apache/ignite/thread/IgniteThread.java  |  4 +
 .../distributed/CacheExchangeMergeTest.java     | 31 +++++---
 8 files changed, 173 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c0e6a11..fc08630 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1882,6 +1882,11 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                         break;
                     }
+                    if (evt.type() == EVT_NODE_JOINED && 
cctx.cache().receivedCachesFromNodeJoin(node)) {
+                        log.info("Stop merge, received caches from node: " + 
node);
+
+                        break;
+                    }
 
                     log.info("Merge exchange future [curFut=" + 
curFut.initialVersion() +
                         ", mergedFut=" + fut.initialVersion() + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 88fa9f1..5a99bbc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1759,6 +1759,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         }
     }
 
+    public boolean receivedCachesFromNodeJoin(ClusterNode node) {
+        return !cachesInfo.cachesReceivedFromJoin(node.id()).isEmpty();
+    }
+
     /**
      * Starts statically configured caches received from remote nodes during 
exchange.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 75d060f..93d22aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1763,8 +1763,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         assert !req.returnValue() || (req.operation() == TRANSFORM || 
req.size() == 1);
 
-        GridDhtTopologyFuture topFut = null;
-
         GridDhtAtomicAbstractUpdateFuture dhtFut = null;
 
         IgniteCacheExpiryPolicy expiry = null;
@@ -1794,24 +1792,23 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         return true;
                     }
 
-                    topFut = top.topologyVersionFuture();
+                    GridDhtTopologyFuture topFut = top.topologyVersionFuture();
 
-                    if (topFut.isDone()) {
-                        topFut = null;
+                    if (!topFut.isDone())
+                        return false; // Will wait at the beginning of next 
updateAllAsyncInternal0 call.
 
-                        // Do not check topology version if topology was 
locked on near node by
-                        // external transaction or explicit lock.
-                        if (req.topologyLocked() || 
!needRemap(req.topologyVersion(), top.topologyVersion())) {
-                            DhtAtomicUpdateResult updRes = update(node, 
locked, req, res);
+                    // Do not check topology version if topology was locked on 
near node by
+                    // external transaction or explicit lock.
+                    if (req.topologyLocked() || 
!needRemap(req.topologyVersion(), top.topologyVersion())) {
+                        DhtAtomicUpdateResult updRes = update(node, locked, 
req, res);
 
-                            dhtFut = updRes.dhtFuture();
-                            deleted = updRes.deleted();
-                            expiry = updRes.expiryPolicy();
-                        }
-                        else
-                            // Should remap all keys.
-                            res.remapTopologyVersion(top.topologyVersion());
+                        dhtFut = updRes.dhtFuture();
+                        deleted = updRes.deleted();
+                        expiry = updRes.expiryPolicy();
                     }
+                    else
+                        // Should remap all keys.
+                        res.remapTopologyVersion(top.topologyVersion());
                 }
                 finally {
                     top.readUnlock();
@@ -1865,26 +1862,22 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             ctx.shared().database().checkpointReadUnlock();
         }
 
-        if (topFut == null) {
-            if (res.remapTopologyVersion() != null) {
-                assert dhtFut == null;
+        if (res.remapTopologyVersion() != null) {
+            assert dhtFut == null;
 
-                completionCb.apply(req, res);
-            }
-            else {
-                if (dhtFut != null)
-                    dhtFut.map(node, res.returnValue(), res, completionCb);
-            }
+            completionCb.apply(req, res);
+        }
+        else {
+            if (dhtFut != null)
+                dhtFut.map(node, res.returnValue(), res, completionCb);
+        }
 
-            if (req.writeSynchronizationMode() != FULL_ASYNC)
-                req.cleanup(!node.isLocal());
+        if (req.writeSynchronizationMode() != FULL_ASYNC)
+            req.cleanup(!node.isLocal());
 
-            sendTtlUpdateRequest(expiry);
+        sendTtlUpdateRequest(expiry);
 
-            return true;
-        }
-        else
-            return waitForTopologyFuture(node, req, completionCb);
+        return true;
     }
 
     /**
@@ -1902,12 +1895,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             Thread curThread = Thread.currentThread();
 
             if (curThread instanceof IgniteThread) {
-                IgniteThread thread = (IgniteThread)curThread;
+                final IgniteThread thread = (IgniteThread)curThread;
 
-                if (thread.policy() == GridIoPolicy.SYSTEM_POOL) {
+                if (thread.hasStripeOrPolicy()) {
                     topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            ctx.closures().runLocalSafe(new Runnable() {
+                            ctx.closures().runLocalWithThreadPolicy(thread, 
new Runnable() {
                                 @Override public void run() {
                                     updateAllAsyncInternal0(node, req, 
completionCb);
                                 }
@@ -3307,7 +3300,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     }
                 }
                 catch (NodeStoppingException e){
-                    U.error(log, "Failed to update key on backup (local node 
is stopping):" + key, e);
+                    U.warn(log, "Failed to update key on backup (local node is 
stopping): " + key);
 
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f749833..ebd305a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -563,8 +563,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         initCachesOnLocalJoin();
                 }
 
-                // TODO IGNITE-5578: caches from joining nodes for merged 
exchanges.
-
                 if (exchCtx.mergeExchanges()) {
                     if (localJoinExchange()) {
                         if (cctx.kernalContext().clientNode()) {
@@ -575,14 +573,14 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         else {
                             onServerNodeEvent(crdNode);
 
-                            exchange = ExchangeType.ALL_2;
+                            exchange = ExchangeType.ALL;
                         }
                     }
                     else {
                         if (CU.clientNode(discoEvt.eventNode()))
                             exchange = onClientNodeEvent(crdNode);
                         else
-                            exchange = cctx.kernalContext().clientNode() ? 
ExchangeType.CLIENT : ExchangeType.ALL_2;
+                            exchange = cctx.kernalContext().clientNode() ? 
ExchangeType.CLIENT : ExchangeType.ALL;
                     }
                 }
                 else {
@@ -609,12 +607,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     break;
                 }
 
-                case ALL_2: {
-                    distributedExchange2();
-
-                    break;
-                }
-
                 case CLIENT: {
                     if (!exchCtx.mergeExchanges())
                         initTopologies();
@@ -933,22 +925,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * @throws IgniteCheckedException If failed.
      */
-    private void distributedExchange2() throws IgniteCheckedException {
-        waitPartitionRelease();
-
-        if (state == ExchangeLocalState.CRD) {
-            if (remaining.isEmpty())
-                onAllReceived();
-        }
-        else
-            sendPartitions(crd);
-
-        initDone();
-    }
-
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
     private void distributedExchange() throws IgniteCheckedException {
         assert crd != null;
 
@@ -980,13 +956,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             }
         }
 
-        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
-                continue;
+        if (!exchCtx.mergeExchanges()) {
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                    continue;
 
-            // It is possible affinity is not initialized yet if node joins to 
cluster.
-            if (grp.affinity().lastVersion().topologyVersion() > 0)
-                grp.topology().beforeExchange(this, !centralizedAff);
+                // It is possible affinity is not initialized yet if node 
joins to cluster.
+                if (grp.affinity().lastVersion().topologyVersion() > 0)
+                    grp.topology().beforeExchange(this, !centralizedAff);
+            }
         }
 
         cctx.database().beforeExchange(this);
@@ -2276,10 +2254,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
-            // TODO IGNITE-5578: process all merged events.
-            if (discoEvt.type() == EVT_NODE_JOINED)
-                assignPartitionsStates();
-            else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+            if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                 assert discoEvt instanceof DiscoveryCustomEvent;
 
                 if (activateCluster())
@@ -2294,8 +2269,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     }
                 }
             }
-            else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == 
EVT_NODE_FAILED)
-                detectLostPartitions();
+            else {
+                if (exchCtx.events().serverJoin())
+                    assignPartitionsStates();
+
+                if (exchCtx.events().serverLeft())
+                    detectLostPartitions();
+            }
 
             updateLastVersion(cctx.versions().last());
 
@@ -2417,6 +2397,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param finishState State.
      * @param msg Request.
      * @param nodeId Node ID.
      */
@@ -3218,9 +3199,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         ALL,
 
         /** */
-        ALL_2,
-
-        /** */
         NONE
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 2ed8ed7..54cfc2b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -25,6 +25,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -42,6 +43,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
@@ -74,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
@@ -109,10 +112,10 @@ public class IgniteTxHandler {
     private GridCacheSharedContext<?, ?> ctx;
 
     /**
-     * @param nearNodeId Node ID.
+     * @param nearNodeId Sender node ID.
      * @param req Request.
      */
-    private void processNearTxPrepareRequest(final UUID nearNodeId, 
GridNearTxPrepareRequest req) {
+    private void processNearTxPrepareRequest(UUID nearNodeId, 
GridNearTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received near prepare request [txId=" + 
req.version() +
                 ", node=" + nearNodeId + ']');
@@ -130,7 +133,29 @@ public class IgniteTxHandler {
             return;
         }
 
-        IgniteInternalFuture<GridNearTxPrepareResponse> fut = 
prepareNearTx(nearNode, req);
+        processNearTxPrepareRequest0(nearNode, req);
+    }
+
+    /**
+     * @param nearNode Sender node.
+     * @param req Request.
+     */
+    private void processNearTxPrepareRequest0(ClusterNode nearNode, 
GridNearTxPrepareRequest req) {
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut;
+
+        if (req.firstClientRequest()) {
+            for (;;) {
+                if (waitForExchangeFuture(nearNode, req))
+                    return;
+
+                fut = prepareNearTx(nearNode, req);
+
+                if (fut != null)
+                    break;
+            }
+        }
+        else
+            fut = prepareNearTx(nearNode, req);
 
         assert req.txState() != null || fut == null || fut.error() != null ||
             (ctx.tm().tx(req.version()) == null && 
ctx.tm().nearTx(req.version()) == null);
@@ -303,9 +328,9 @@ public class IgniteTxHandler {
     /**
      * @param nearNode Node that initiated transaction.
      * @param req Near prepare request.
-     * @return Prepare future.
+     * @return Prepare future or {@code null} if need retry operation.
      */
-    private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
+    @Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> 
prepareNearTx(
         final ClusterNode nearNode,
         final GridNearTxPrepareRequest req
     ) {
@@ -348,6 +373,11 @@ public class IgniteTxHandler {
                 top = firstEntry.context().topology();
 
                 top.readLock();
+
+                GridDhtTopologyFuture topFut = top.topologyVersionFuture();
+
+                if (!topFut.isDone())
+                    return null;
             }
 
             try {
@@ -499,6 +529,48 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param node Sender node.
+     * @param req Request.
+     * @return {@code True} if update will be retried from future listener.
+     */
+    private boolean waitForExchangeFuture(final ClusterNode node, final 
GridNearTxPrepareRequest req) {
+        assert req.firstClientRequest() : req;
+
+        GridDhtTopologyFuture topFut = ctx.exchange().lastTopologyFuture();
+
+        if (!topFut.isDone()) {
+            Thread curThread = Thread.currentThread();
+
+            if (curThread instanceof IgniteThread) {
+                final IgniteThread thread = (IgniteThread)curThread;
+
+                if (thread.hasStripeOrPolicy()) {
+                    topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            
ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, new Runnable() {
+                                @Override public void run() {
+                                    processNearTxPrepareRequest0(node, req);
+                                }
+                            });
+                        }
+                    });
+
+                    return true;
+                }
+            }
+
+            try {
+                topFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Topology future failed: " + e, e);
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * @param expVer Expected topology version.
      * @param curVer Current topology version.
      * @param req Request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 01207e3..201815c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -69,6 +69,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.LoadBalancerResource;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -779,13 +780,22 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param c Closure to execute.
-     * @param sys If {@code true}, then system pool will be used, otherwise 
public pool will be used.
-     * @return Future.
-     * @throws IgniteCheckedException Thrown in case of any errors.
+     * @param thread Thread.
+     * @param c Closure.
      */
-    private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, 
boolean sys) throws IgniteCheckedException {
-        return runLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : 
GridIoPolicy.PUBLIC_POOL);
+    public void runLocalWithThreadPolicy(IgniteThread thread, Runnable c) {
+        assert thread.stripe() >= 0 || thread.policy() != 
GridIoPolicy.UNDEFINED : thread;
+
+        if (thread.stripe() >= 0)
+            ctx.getStripedExecutorService().execute(thread.stripe(), c);
+        else {
+            try {
+                ctx.pools().poolForPolicy(thread.policy()).execute(c);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to get pool for policy: " + 
thread.policy(), e);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
index c814625..f0d39fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
@@ -125,6 +125,10 @@ public class IgniteThread extends Thread {
         return stripe;
     }
 
+    public boolean hasStripeOrPolicy() {
+        return stripe >= 0 || plc != GridIoPolicy.UNDEFINED;
+    }
+
     /**
      * Gets name of the Ignite instance this thread belongs to.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/316dcc69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 011a01d..16bac3e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -59,7 +59,9 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -76,7 +78,7 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     private boolean testSpi;
 
     /** */
-    private static String[] cacheNames = {"c1"/*, "c2", "c3", "c4", "c5", 
"c6", "c7", "c8", "c9", "c10"*/};
+    private static String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", 
"c7", "c8", "c9", "c10"};
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
@@ -96,16 +98,16 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         }
 
         cfg.setCacheConfiguration(
-            cacheConfiguration("c1", ATOMIC, PARTITIONED, 0)
-//            cacheConfiguration("c2", ATOMIC, PARTITIONED, 1),
-//            cacheConfiguration("c3", ATOMIC, PARTITIONED, 2),
-//            cacheConfiguration("c4", ATOMIC, PARTITIONED, 10),
-//            cacheConfiguration("c5", ATOMIC, REPLICATED, 0),
-//            cacheConfiguration("c6", TRANSACTIONAL, PARTITIONED, 0),
-//            cacheConfiguration("c7", TRANSACTIONAL, PARTITIONED, 1),
-//            cacheConfiguration("c8", TRANSACTIONAL, PARTITIONED, 2),
-//            cacheConfiguration("c9", TRANSACTIONAL, PARTITIONED, 10),
-//            cacheConfiguration("c10", TRANSACTIONAL, REPLICATED, 0)
+            cacheConfiguration("c1", ATOMIC, PARTITIONED, 0),
+            cacheConfiguration("c2", ATOMIC, PARTITIONED, 1),
+            cacheConfiguration("c3", ATOMIC, PARTITIONED, 2),
+            cacheConfiguration("c4", ATOMIC, PARTITIONED, 10),
+            cacheConfiguration("c5", ATOMIC, REPLICATED, 0),
+            cacheConfiguration("c6", TRANSACTIONAL, PARTITIONED, 0),
+            cacheConfiguration("c7", TRANSACTIONAL, PARTITIONED, 1),
+            cacheConfiguration("c8", TRANSACTIONAL, PARTITIONED, 2),
+            cacheConfiguration("c9", TRANSACTIONAL, PARTITIONED, 10),
+            cacheConfiguration("c10", TRANSACTIONAL, REPLICATED, 0)
         );
 
         return cfg;
@@ -121,10 +123,15 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     /**
      * @param name Cache name.
      * @param atomicityMode Cache atomicity mode.
+     * @param cacheMode Cache mode.
      * @param backups Number of backups.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration(String name, 
CacheAtomicityMode atomicityMode, CacheMode cacheMode, int backups) {
+    private CacheConfiguration cacheConfiguration(String name,
+        CacheAtomicityMode atomicityMode,
+        CacheMode cacheMode,
+        int backups)
+    {
         CacheConfiguration ccfg = new CacheConfiguration(name);
 
         ccfg.setAtomicityMode(atomicityMode);

Reply via email to