Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 2759dbcaa -> d2daf9fbc


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2daf9fb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2daf9fb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2daf9fb

Branch: refs/heads/ignite-zk
Commit: d2daf9fbc22d78985882913d9f924c1510365793
Parents: 2759dbc
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Jan 11 12:18:03 2018 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Jan 11 14:14:47 2018 +0300

----------------------------------------------------------------------
 .../CommunicationFailureContext.java            |   5 +-
 .../discovery/GridDiscoveryManager.java         |   5 +-
 .../managers/discovery/IgniteDiscoverySpi.java  |   4 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   4 +-
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   4 +-
 .../internal/ZkCommunicationFailureContext.java |   8 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  18 +-
 .../zk/internal/ZookeeperDiscoverySpiTest.java  | 508 ++++++++++++++++---
 8 files changed, 468 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
index 8a9906b..d75cfdc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.configuration;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 
@@ -38,9 +39,9 @@ public interface CommunicationFailureContext {
     public boolean connectionAvailable(ClusterNode node1, ClusterNode node2);
 
     /**
-     * @return List of currently started cache.
+     * @return Currently started caches.
      */
-    public List<String> startedCaches();
+    public Map<String, CacheConfiguration<?, ?>> startedCaches();
 
     /**
      * @param cacheName Cache name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index cc63384..c0ff6ba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -79,7 +79,6 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
@@ -2458,7 +2457,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @return {@code True} if SPI supports communication error resolve.
      */
     private static boolean supportsCommunicationErrorResolve(DiscoverySpi spi) 
{
-        return spi instanceof IgniteDiscoverySpi && 
((IgniteDiscoverySpi)spi).supportsCommunicationErrorResolve();
+        return spi instanceof IgniteDiscoverySpi && 
((IgniteDiscoverySpi)spi).supportsCommunicationFailureResolve();
     }
 
     /**
@@ -2496,7 +2495,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         if (!supportsCommunicationErrorResolve(spi) || 
!supportsCommunicationErrorResolve(ctx.config().getCommunicationSpi()))
             throw new UnsupportedOperationException();
 
-        ((IgniteDiscoverySpi)spi).resolveCommunicationError(node, err);
+        ((IgniteDiscoverySpi)spi).resolveCommunicationFailure(node, err);
     }
 
     /** Worker for network segment checks. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index bf117f1..9aa5d14 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -57,11 +57,11 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
     /**
      * @return {@code True} if supports communication error resolve.
      */
-    public boolean supportsCommunicationErrorResolve();
+    public boolean supportsCommunicationFailureResolve();
 
     /**
      * @param node Problem node.
      * @param err Connection error.
      */
-    public void resolveCommunicationError(ClusterNode node, Exception err);
+    public void resolveCommunicationFailure(ClusterNode node, Exception err);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 292d67e..809a8c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2108,12 +2108,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
     }
 
     /** {@inheritDoc} */
-    @Override public boolean supportsCommunicationErrorResolve() {
+    @Override public boolean supportsCommunicationFailureResolve() {
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public void resolveCommunicationError(ClusterNode node, 
Exception err) {
+    @Override public void resolveCommunicationFailure(ClusterNode node, 
Exception err) {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index a89d46a..fc1af6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -243,12 +243,12 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
     }
 
     /** {@inheritDoc} */
-    @Override public boolean supportsCommunicationErrorResolve() {
+    @Override public boolean supportsCommunicationFailureResolve() {
         return true;
     }
 
     /** {@inheritDoc} */
-    @Override public void resolveCommunicationError(ClusterNode node, 
Exception err) {
+    @Override public void resolveCommunicationFailure(ClusterNode node, 
Exception err) {
         impl.resolveCommunicationError(node, err);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
index c9521dc..d27b717 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationFailureContext.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.CommunicationFailureContext;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -35,6 +36,7 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
@@ -103,14 +105,14 @@ class ZkCommunicationFailureContext implements 
CommunicationFailureContext {
     }
 
     /** {@inheritDoc} */
-    @Override public List<String> startedCaches() {
+    @Override public Map<String, CacheConfiguration<?, ?>> startedCaches() {
         Map<Integer, DynamicCacheDescriptor> cachesMap = 
ctx.affinity().caches();
 
-        List<String> res = new ArrayList<>(cachesMap.size());
+        Map<String, CacheConfiguration<?, ?>> res = 
U.newHashMap(cachesMap.size());
 
         for (DynamicCacheDescriptor desc : cachesMap.values()) {
             if (desc.cacheType().userCache())
-                res.add(desc.cacheName());
+                res.put(desc.cacheName(), desc.cacheConfiguration());
         }
 
         return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index c9328fc..75363e3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.GridLongList;
@@ -329,8 +330,12 @@ public class ZookeeperDiscoveryImpl {
 
                     fut.scheduleCheckOnTimeout();
                 }
-                else
+                else {
                     fut = commErrProcFut.get();
+
+                    if (fut == null)
+                        continue;
+                }
             }
 
             nodeStatusFut = fut.nodeStatusFuture(node);
@@ -4211,12 +4216,19 @@ public class ZookeeperDiscoveryImpl {
         void runOfWaitForExchange() throws Exception {
             GridCacheSharedContext cacheCtx = 
((IgniteKernal)spi.ignite()).context().cache().context();
 
-            IgniteInternalFuture<?> exchFut =
-                
cacheCtx.exchange().affinityReadyFuture(cacheCtx.discovery().topologyVersionEx());
+            final AffinityTopologyVersion topVer = 
cacheCtx.discovery().topologyVersionEx();
+
+            IgniteInternalFuture<?> exchFut = 
cacheCtx.exchange().affinityReadyFuture(topVer);
 
             if (exchFut != null && !exchFut.isDone()) {
+                if (log.isInfoEnabled())
+                    log.info("Wait for current exchange completion [topVer=" + 
topVer + ']');
+
                 exchFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> fut) {
+                        if (log.isInfoEnabled())
+                            log.info("Finished wait for current exchange 
completion [topVer=" + topVer + ']');
+
                         // Most probably listener is run from Ignite thread, 
run fake async operation to return to Zookeeper thread.
                         rtState.zkClient.existsAsync(zkPaths.aliveNodesDir, 
null, WaitExchangeCompletionCallback.this);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2daf9fb/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
index b6791c2..75ecb8c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.DiscoverySpiTestListener;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -75,7 +77,10 @@ import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
@@ -84,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -91,6 +97,7 @@ import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.plugin.security.SecuritySubject;
@@ -184,7 +191,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     private boolean persistence;
 
     /** */
-    private IgniteOutClosure<CommunicationFailureResolver> commProblemRslvr;
+    private IgniteOutClosure<CommunicationFailureResolver> commFailureRslvr;
 
     /** */
     private IgniteOutClosure<DiscoverySpiNodeAuthenticator> auth;
@@ -325,8 +332,8 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         if (testCommSpi)
             cfg.setCommunicationSpi(new ZkTestCommunicationSpi());
 
-        if (commProblemRslvr != null)
-            cfg.setCommunicationFailureResolver(commProblemRslvr.apply());
+        if (commFailureRslvr != null)
+            cfg.setCommunicationFailureResolver(commFailureRslvr.apply());
 
         return cfg;
     }
@@ -464,7 +471,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
 
             checkInternalStructuresCleanup();
 
-            checkZkNodesCleanup();
+            // checkZkNodesCleanup();
         }
         finally {
             reset();
@@ -2366,26 +2373,26 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testNoOpCommunicationErrorResolve_1() throws Exception {
-        communicationErrorResolve_Simple(2);
+    public void testNoOpCommunicationFailureResolve_1() throws Exception {
+        communicationFailureResolve_Simple(2);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNoOpCommunicationErrorResolve_2() throws Exception {
-        communicationErrorResolve_Simple(10);
+        communicationFailureResolve_Simple(10);
     }
 
     /**
      * @param nodes Nodes number.
      * @throws Exception If failed.
      */
-    private void communicationErrorResolve_Simple(int nodes) throws Exception {
+    private void communicationFailureResolve_Simple(int nodes) throws 
Exception {
         assert nodes > 1;
 
         sesTimeout = 2000;
-        commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
 
         startGridsMultiThreaded(nodes);
 
@@ -2405,7 +2412,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
 
             ZookeeperDiscoverySpi spi = spi(ignite(idx1));
 
-            spi.resolveCommunicationError(ignite(idx2).cluster().localNode(), 
new Exception("test"));
+            
spi.resolveCommunicationFailure(ignite(idx2).cluster().localNode(), new 
Exception("test"));
 
             checkInternalStructuresCleanup();
         }
@@ -2418,7 +2425,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
      */
     public void testNoOpCommunicationErrorResolve_3() throws Exception {
         sesTimeout = 2000;
-        commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
 
         startGridsMultiThreaded(3);
 
@@ -2433,7 +2440,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             @Override public Object call() {
                 ZookeeperDiscoverySpi spi = spi(ignite(0));
 
-                spi.resolveCommunicationError(ignite(1).cluster().localNode(), 
new Exception("test"));
+                
spi.resolveCommunicationFailure(ignite(1).cluster().localNode(), new 
Exception("test"));
 
                 return null;
             }
@@ -2466,13 +2473,13 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         testCommSpi = true;
 
         sesTimeout = 2000;
-        commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
 
         startGrid(0);
 
         startGridsMultiThreaded(1, 3);
 
-        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3));
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.testSpi(ignite(3));
 
         commSpi.pingLatch = new CountDownLatch(1);
 
@@ -2480,7 +2487,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             @Override public Object call() {
                 ZookeeperDiscoverySpi spi = spi(ignite(1));
 
-                spi.resolveCommunicationError(ignite(2).cluster().localNode(), 
new Exception("test"));
+                
spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new 
Exception("test"));
 
                 return null;
             }
@@ -2508,13 +2515,13 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         testCommSpi = true;
 
         sesTimeout = 2000;
-        commProblemRslvr = NoOpCommunicationFailureResolver.FACTORY;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
 
         startGrid(0);
 
         startGridsMultiThreaded(1, 3);
 
-        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3));
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.testSpi(ignite(3));
 
         commSpi.pingStartLatch = new CountDownLatch(1);
         commSpi.pingLatch = new CountDownLatch(1);
@@ -2523,7 +2530,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             @Override public Object call() {
                 ZookeeperDiscoverySpi spi = spi(ignite(1));
 
-                spi.resolveCommunicationError(ignite(2).cluster().localNode(), 
new Exception("test"));
+                
spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new 
Exception("test"));
 
                 return null;
             }
@@ -2566,49 +2573,49 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillNode_1() throws Exception {
-        communicationErrorResolve_KillNodes(2, Collections.singleton(2L));
+        communicationFailureResolve_KillNodes(2, Collections.singleton(2L));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillNode_2() throws Exception {
-        communicationErrorResolve_KillNodes(3, Collections.singleton(2L));
+        communicationFailureResolve_KillNodes(3, Collections.singleton(2L));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillNode_3() throws Exception {
-        communicationErrorResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L));
+        communicationFailureResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillCoordinator_1() throws 
Exception {
-        communicationErrorResolve_KillNodes(2, Collections.singleton(1L));
+        communicationFailureResolve_KillNodes(2, Collections.singleton(1L));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillCoordinator_2() throws 
Exception {
-        communicationErrorResolve_KillNodes(3, Collections.singleton(1L));
+        communicationFailureResolve_KillNodes(3, Collections.singleton(1L));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillCoordinator_3() throws 
Exception {
-        communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L));
+        communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillCoordinator_4() throws 
Exception {
-        communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L));
+        communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L));
     }
 
     /**
@@ -2616,14 +2623,14 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
      * @param killNodes Nodes to kill by resolve process.
      * @throws Exception If failed.
      */
-    private void communicationErrorResolve_KillNodes(int startNodes, 
Collection<Long> killNodes) throws Exception {
+    private void communicationFailureResolve_KillNodes(int startNodes, 
Collection<Long> killNodes) throws Exception {
         testCommSpi = true;
 
-        commProblemRslvr = 
TestNodeKillCommunicationFailureResolver.factory(killNodes);
+        commFailureRslvr = 
TestNodeKillCommunicationFailureResolver.factory(killNodes);
 
         startGrids(startNodes);
 
-        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(0));
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.testSpi(ignite(0));
 
         commSpi.checkRes = new BitSet(startNodes);
 
@@ -2643,7 +2650,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         assertNotNull(killNodeId);
 
         try {
-            spi.resolveCommunicationError(spi.getNode(killNodeId), new 
Exception("test"));
+            spi.resolveCommunicationFailure(spi.getNode(killNodeId), new 
Exception("test"));
 
             fail("Exception is not thrown");
         }
@@ -2666,11 +2673,11 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testCommunicationErrorResolve_KillCoordinator_5() throws 
Exception {
+    public void testCommunicationFailureResolve_KillCoordinator_5() throws 
Exception {
         sesTimeout = 2000;
 
         testCommSpi = true;
-        commProblemRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY;
+        commFailureRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY;
 
         startGrids(10);
 
@@ -2682,14 +2689,14 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             info("Iteration: " + i);
 
             for (Ignite node : G.allGrids())
-                ZkTestCommunicationSpi.spi(node).initCheckResult(10);
+                ZkTestCommunicationSpi.testSpi(node).initCheckResult(10);
 
             UUID crdId = ignite(crd).cluster().localNode().id();
 
             ZookeeperDiscoverySpi spi = spi(ignite(crd + 1));
 
             try {
-                spi.resolveCommunicationError(spi.getNode(crdId), new 
Exception("test"));
+                spi.resolveCommunicationFailure(spi.getNode(crdId), new 
Exception("test"));
 
                 fail("Exception is not thrown");
             }
@@ -2710,11 +2717,11 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testCommunicationErrorResolve_KillRandom() throws Exception {
+    public void testCommunicationFailureResolve_KillRandom() throws Exception {
         sesTimeout = 2000;
 
         testCommSpi = true;
-        commProblemRslvr = KillRandomCommunicationFailureResolver.FACTORY;
+        commFailureRslvr = KillRandomCommunicationFailureResolver.FACTORY;
 
         startGridsMultiThreaded(10);
 
@@ -2730,7 +2737,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             ZookeeperDiscoverySpi spi = null;
 
             for (Ignite node : G.allGrids()) {
-                ZkTestCommunicationSpi.spi(node).initCheckResult(100);
+                ZkTestCommunicationSpi.testSpi(node).initCheckResult(100);
 
                 spi = spi(node);
             }
@@ -2738,7 +2745,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
             assert spi != null;
 
             try {
-                
spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
+                
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
             }
             catch (IgniteSpiException ignore) {
                 // No-op.
@@ -2755,15 +2762,15 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testDefaultCommunicationErrorResolver1() throws Exception {
+    public void testDefaultCommunicationFailureResolver1() throws Exception {
         testCommSpi = true;
         sesTimeout = 5000;
 
         startGrids(3);
 
-        ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 0, 1);
-        ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(3, 0, 1);
-        ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(3, 2);
+        ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(3, 2);
 
         UUID killedId = nodeId(2);
 
@@ -2771,7 +2778,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
 
         ZookeeperDiscoverySpi spi = spi(ignite(0));
 
-        
spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()),
 new Exception("test"));
+        
spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()),
 new Exception("test"));
 
         waitForTopology(2);
 
@@ -2781,7 +2788,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testDefaultCommunicationErrorResolver2() throws Exception {
+    public void testDefaultCommunicationFailureResolver2() throws Exception {
         testCommSpi = true;
         sesTimeout = 5000;
 
@@ -2791,15 +2798,15 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
 
         startGridsMultiThreaded(3, 2);
 
-        ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(5, 0, 1);
-        ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(5, 0, 1);
-        ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(5, 2, 3, 4);
-        ZkTestCommunicationSpi.spi(ignite(3)).initCheckResult(5, 2, 3, 4);
-        ZkTestCommunicationSpi.spi(ignite(4)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.testSpi(ignite(3)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.testSpi(ignite(4)).initCheckResult(5, 2, 3, 4);
 
         ZookeeperDiscoverySpi spi = spi(ignite(0));
 
-        
spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()),
 new Exception("test"));
+        
spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()),
 new Exception("test"));
 
         waitForTopology(2);
     }
@@ -2807,22 +2814,22 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testDefaultCommunicationErrorResolver3() throws Exception {
-        defaultCommunicationErrorResolver_BreakCommunication(3, 1);
+    public void testDefaultCommunicationFailureResolver3() throws Exception {
+        defaultCommunicationFailureResolver_BreakCommunication(3, 1);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testDefaultCommunicationErrorResolver4() throws Exception {
-        defaultCommunicationErrorResolver_BreakCommunication(3, 0);
+    public void testDefaultCommunicationFailureResolver4() throws Exception {
+        defaultCommunicationFailureResolver_BreakCommunication(3, 0);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testDefaultCommunicationErrorResolver5() throws Exception {
-        defaultCommunicationErrorResolver_BreakCommunication(10, 1, 3, 6);
+    public void testDefaultCommunicationFailureResolver5() throws Exception {
+        defaultCommunicationFailureResolver_BreakCommunication(10, 1, 3, 6);
     }
 
     /**
@@ -2830,7 +2837,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
      * @param breakNodes Node indices where communication server is closed.
      * @throws Exception If failed.
      */
-    private void defaultCommunicationErrorResolver_BreakCommunication(int 
startNodes, final int...breakNodes) throws Exception {
+    private void defaultCommunicationFailureResolver_BreakCommunication(int 
startNodes, final int...breakNodes) throws Exception {
         sesTimeout = 5000;
 
         startGridsMultiThreaded(startNodes);
@@ -2860,13 +2867,13 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testCommunicationErrorResolve_CachesInfo1() throws Exception {
+    public void testCommunicationFailureResolve_CachesInfo1() throws Exception 
{
         testCommSpi = true;
         sesTimeout = 5000;
 
         final CacheInfoCommunicationFailureResolver rslvr = new 
CacheInfoCommunicationFailureResolver();
 
-        commProblemRslvr = new 
IgniteOutClosure<CommunicationFailureResolver>() {
+        commFailureRslvr = new 
IgniteOutClosure<CommunicationFailureResolver>() {
             @Override public CommunicationFailureResolver apply() {
                 return rslvr;
             }
@@ -2874,21 +2881,320 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
 
         startGrids(2);
 
-        ZookeeperDiscoverySpi spi = spi(ignite(0));
+        awaitPartitionMapExchange();
+
+        Map<String, T3<Integer, Integer, Integer>> expCaches = new HashMap<>();
+
+        expCaches.put(DEFAULT_CACHE_NAME, new 
T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        List<CacheConfiguration> caches = new ArrayList<>();
+
+        CacheConfiguration c1 = new CacheConfiguration("c1");
+        c1.setBackups(1);
+        c1.setAffinity(new RendezvousAffinityFunction(false, 64));
+        caches.add(c1);
+
+        CacheConfiguration c2 = new CacheConfiguration("c2");
+        c2.setBackups(2);
+        c2.setAffinity(new RendezvousAffinityFunction(false, 128));
+        caches.add(c2);
+
+        CacheConfiguration c3 = new CacheConfiguration("c3");
+        c3.setCacheMode(CacheMode.REPLICATED);
+        c3.setAffinity(new RendezvousAffinityFunction(false, 256));
+        caches.add(c3);
+
+        ignite(0).createCaches(caches);
+
+        expCaches.put("c1", new T3<>(64, 1, 2));
+        expCaches.put("c2", new T3<>(128, 2, 2));
+        expCaches.put("c3", new T3<>(256, 1, 2));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        startGrid(2);
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        expCaches.put("c2", new T3<>(128, 2, 3));
+        expCaches.put("c3", new T3<>(256, 1, 4));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        CacheConfiguration c4 = new CacheConfiguration("c4");
+        c4.setCacheMode(CacheMode.PARTITIONED);
+        c4.setBackups(0);
+        c4.setAffinity(new RendezvousAffinityFunction(false, 256));
+        c4.setNodeFilter(new 
TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), 
getTestIgniteInstanceName(1)));
+
+        ignite(2).createCache(c4);
+
+        expCaches.put("c4", new T3<>(256, 0, 1));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        stopGrid(0); // Stop current coordinator, check new coordinator will 
initialize required caches information.
+
+        awaitPartitionMapExchange();
+
+        expCaches.put("c3", new T3<>(256, 1, 3));
+
+        checkResolverCachesInfo(ignite(1), expCaches);
+
+        startGrid(0);
+
+        expCaches.put("c3", new T3<>(256, 1, 4));
+
+        checkResolverCachesInfo(ignite(1), expCaches);
+
+        stopGrid(1);
+
+        expCaches.put("c3", new T3<>(256, 1, 3));
+
+        checkResolverCachesInfo(ignite(3), expCaches);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationFailureResolve_CachesInfo2() throws Exception 
{
+        testCommSpi = true;
+        sesTimeout = 5000;
+
+        final CacheInfoCommunicationFailureResolver rslvr = new 
CacheInfoCommunicationFailureResolver();
+
+        commFailureRslvr = new 
IgniteOutClosure<CommunicationFailureResolver>() {
+            @Override public CommunicationFailureResolver apply() {
+                return rslvr;
+            }
+        };
+
+        Ignite srv0 = startGrid(0);
+
+        CacheConfiguration ccfg = new CacheConfiguration("c1");
+        ccfg.setBackups(1);
+
+        srv0.createCache(ccfg);
+
+        // Block rebalance to make sure node0 will be the only owner.
+        TestRecordingCommunicationSpi.spi(srv0).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridDhtPartitionSupplyMessage &&
+                    ((GridDhtPartitionSupplyMessage) msg).groupId() == 
CU.cacheId("c1");
+            }
+        });
+
+        startGrid(1);
+
+        U.sleep(1000);
+
+        ZookeeperDiscoverySpi spi = spi(srv0);
 
         rslvr.latch = new CountDownLatch(1);
 
-        ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(2, 0);
+        ZkTestCommunicationSpi.testSpi(srv0).initCheckResult(2, 0);
 
-        spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), 
new Exception("test"));
+        
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
 
         assertTrue(rslvr.latch.await(10, SECONDS));
 
-        List<String> caches = Arrays.asList(DEFAULT_CACHE_NAME);
+        List<List<ClusterNode>> cacheOwners = rslvr.ownersMap.get("c1");
+
+        ClusterNode node0 = srv0.cluster().localNode();
+
+        for (int p = 0; p < RendezvousAffinityFunction.DFLT_PARTITION_COUNT; 
p++) {
+            List<ClusterNode> owners = cacheOwners.get(p);
+
+            assertEquals(1, owners.size());
+            assertEquals(node0, owners.get(0));
+        }
+
+        TestRecordingCommunicationSpi.spi(srv0).stopBlock();
+
+        awaitPartitionMapExchange();
 
-        Collections.sort(caches);
+        Map<String, T3<Integer, Integer, Integer>> expCaches = new HashMap<>();
 
-        assertEquals(caches, rslvr.caches);
+        expCaches.put(DEFAULT_CACHE_NAME, new 
T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1));
+        expCaches.put("c1", new 
T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 1, 2));
+
+        checkResolverCachesInfo(srv0, expCaches);
+    }
+
+    /**
+     * @param crd Coordinator node.
+     * @param expCaches Expected caches info.
+     * @throws Exception If failed.
+     */
+    private void checkResolverCachesInfo(Ignite crd, Map<String, T3<Integer, 
Integer, Integer>> expCaches)
+        throws Exception
+    {
+        CacheInfoCommunicationFailureResolver rslvr =
+            
(CacheInfoCommunicationFailureResolver)crd.configuration().getCommunicationFailureResolver();
+
+        assertNotNull(rslvr);
+
+        ZookeeperDiscoverySpi spi = spi(crd);
+
+        rslvr.latch = new CountDownLatch(1);
+
+        
ZkTestCommunicationSpi.testSpi(crd).initCheckResult(crd.cluster().nodes().size(),
 0);
+
+        
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
+
+        assertTrue(rslvr.latch.await(10, SECONDS));
+
+        rslvr.checkCachesInfo(expCaches);
+
+        rslvr.reset();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testCommunicationFailureResolve_ConcurrentDiscoveyEvents() 
throws Exception {
+        sesTimeout = 5000;
+
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
+
+        final int INITIAL_NODES = 5;
+
+        startGridsMultiThreaded(INITIAL_NODES);
+
+        final CyclicBarrier b = new CyclicBarrier(4);
+
+        GridCompoundFuture<?, ?> fut = new GridCompoundFuture<>();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                b.await();
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int i = 0; i < 10; i++) {
+                    startGrid(i + INITIAL_NODES);
+
+                    Thread.sleep(rnd.nextLong(1000) + 10);
+
+                    if (stop.get())
+                        break;
+                }
+
+                return null;
+            }
+        }, "test-node-start"));
+
+        fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                b.await();
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (!stop.get()) {
+                    startGrid(100);
+
+                    Thread.sleep(rnd.nextLong(1000) + 10);
+
+                    stopGrid(100);
+
+                    Thread.sleep(rnd.nextLong(1000) + 10);
+                }
+
+                return null;
+            }
+        }, "test-node-restart"));
+
+        fut.add((IgniteInternalFuture)GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                b.await();
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int idx = 0;
+
+                while (!stop.get()) {
+                    CacheConfiguration ccfg = new CacheConfiguration("c-" + 
idx++);
+                    ccfg.setBackups(rnd.nextInt(5));
+
+                    ignite(rnd.nextInt(INITIAL_NODES)).createCache(ccfg);
+
+                    Thread.sleep(rnd.nextLong(1000) + 10);
+
+                    
ignite(rnd.nextInt(INITIAL_NODES)).destroyCache(ccfg.getName());
+
+                    Thread.sleep(rnd.nextLong(1000) + 10);
+                }
+
+                return null;
+            }
+        }, "test-create-cache"));
+
+        fut.add((IgniteInternalFuture)GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try {
+                    b.await();
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < 5; i++) {
+                        info("resolveCommunicationFailure: " + i);
+
+                        ZookeeperDiscoverySpi spi = 
spi(ignite(rnd.nextInt(INITIAL_NODES)));
+
+                        
spi.resolveCommunicationFailure(ignite(rnd.nextInt(INITIAL_NODES)).cluster().localNode(),
+                            new Exception("test"));
+                    }
+
+                    return null;
+                }
+                finally {
+                    stop.set(true);
+                }
+            }
+        }, 5, "test-resolve-failure"));
+
+        fut.markInitialized();
+
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationFailureResolve_ConcurrentMultinode() throws 
Exception {
+        sesTimeout = 5000;
+
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
+
+        startGridsMultiThreaded(5);
+
+        client = true;
+
+        startGridsMultiThreaded(5, 5);
+
+        final int NODES = 10;
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                for (int i = 0; i < 5; i++) {
+                    info("resolveCommunicationFailure: " + i);
+
+                    ZookeeperDiscoverySpi spi = 
spi(ignite(rnd.nextInt(NODES)));
+
+                    
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
+                }
+
+                return null;
+            }
+        }, 30, "test-resolve-failure");
     }
 
     /**
@@ -4100,7 +4406,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
         private IgniteLogger log;
 
         /** */
-        List<String> caches;
+        Map<String, CacheConfiguration<?, ?>>  caches;
 
         /** */
         Map<String, List<List<ClusterNode>>> affMap;
@@ -4118,22 +4424,82 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
 
             caches = ctx.startedCaches();
 
-            Collections.sort(caches);
-
-            log.info("Resolver called, started caches: " + caches);
+            log.info("Resolver called, started caches: " + caches.keySet());
 
             assertNotNull(caches);
 
             affMap = new HashMap<>();
             ownersMap = new HashMap<>();
 
-            for (String cache : caches) {
+            for (String cache : caches.keySet()) {
                 affMap.put(cache, ctx.cacheAffinity(cache));
                 ownersMap.put(cache, ctx.cachePartitionOwners(cache));
             }
 
             latch.countDown();
         }
+
+        /**
+         * @param expCaches Expected caches information (when late assignment 
doen and rebalance finished).
+         */
+        void checkCachesInfo(Map<String, T3<Integer, Integer, Integer>> 
expCaches) {
+            assertNotNull(caches);
+            assertNotNull(affMap);
+            assertNotNull(ownersMap);
+
+            for (Map.Entry<String, T3<Integer, Integer, Integer>> e : 
expCaches.entrySet()) {
+                String cacheName = e.getKey();
+
+                int parts = e.getValue().get1();
+                int backups = e.getValue().get2();
+                int expNodes = e.getValue().get3();
+
+                assertTrue(cacheName, caches.containsKey(cacheName));
+
+                CacheConfiguration ccfg = caches.get(cacheName);
+
+                assertEquals(cacheName, ccfg.getName());
+
+                if (ccfg.getCacheMode() == CacheMode.REPLICATED)
+                    assertEquals(Integer.MAX_VALUE, ccfg.getBackups());
+                else
+                    assertEquals(backups, ccfg.getBackups());
+
+                assertEquals(parts, ccfg.getAffinity().partitions());
+
+                List<List<ClusterNode>> aff = affMap.get(cacheName);
+
+                assertNotNull(cacheName, aff);
+                assertEquals(parts, aff.size());
+
+                List<List<ClusterNode>> owners = ownersMap.get(cacheName);
+
+                assertNotNull(cacheName, owners);
+                assertEquals(parts, owners.size());
+
+                for (int i = 0; i < parts; i++) {
+                    List<ClusterNode> partAff = aff.get(i);
+
+                    assertEquals(cacheName, expNodes, partAff.size());
+
+                    List<ClusterNode> partOwners = owners.get(i);
+
+                    assertEquals(cacheName, expNodes, partOwners.size());
+
+                    assertTrue(cacheName, partAff.containsAll(partOwners));
+                    assertTrue(cacheName, partOwners.containsAll(partAff));
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void reset() {
+            caches = null;
+            affMap = null;
+            ownersMap = null;
+        }
     }
 
     /**
@@ -4262,7 +4628,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
     /**
      *
      */
-    static class ZkTestCommunicationSpi extends TcpCommunicationSpi {
+    static class ZkTestCommunicationSpi extends TestRecordingCommunicationSpi {
         /** */
         private volatile CountDownLatch pingStartLatch;
 
@@ -4276,7 +4642,7 @@ public class ZookeeperDiscoverySpiTest extends 
GridCommonAbstractTest {
          * @param ignite Node.
          * @return Node's communication SPI.
          */
-        static ZkTestCommunicationSpi spi(Ignite ignite) {
+        static ZkTestCommunicationSpi testSpi(Ignite ignite) {
             return 
(ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi();
         }
 

Reply via email to