zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbe9980b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbe9980b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbe9980b

Branch: refs/heads/ignite-zk
Commit: cbe9980b45d6135ef6f67610f751c1599df9e54a
Parents: 7e8f85f
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Dec 14 14:48:26 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Dec 14 18:04:38 2017 +0300

----------------------------------------------------------------------
 .../CommunicationProblemContext.java            |  62 ++++++++++
 .../CommunicationProblemResolver.java           |  28 +++++
 .../configuration/IgniteConfiguration.java      |  13 ++
 .../ignite/internal/util/nio/GridNioServer.java |  10 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  49 ++++++++
 .../discovery/CommunicationProblemContext.java  |  62 ----------
 .../discovery/CommunicationProblemResolver.java |  28 -----
 .../DefaultCommunicationProblemResolver.java    |   2 +
 .../ZkCommunicationErrorProcessFuture.java      |  11 +-
 .../internal/ZkCommunicationProblemContext.java |  65 ++++++++++
 .../ZkDistributedCollectDataFuture.java         |   6 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  59 ++++++++--
 .../ZookeeperDiscoverySpiBasicTest.java         | 118 ++++++++++++++++++-
 13 files changed, 401 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java
new file mode 100644
index 0000000..9d53a97
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemContext.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+
+/**
+ *
+ */
+public interface CommunicationProblemContext {
+    /**
+     * @return Current topology snapshot.
+     */
+    public List<ClusterNode> topologySnapshot();
+
+    /**
+     * @param node1 First node.
+     * @param node2 Second node.
+     * @return {@code True} if {@link CommunicationSpi} is able to establish 
connection from first node to second node.
+     */
+    public boolean connectionAvailable(ClusterNode node1, ClusterNode node2);
+
+    /**
+     * @return List of currently started cache.
+     */
+    public List<String> startedCaches();
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache partitions affinity assignment.
+     */
+    public List<List<ClusterNode>> cacheAffinity(String cacheName);
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache partitions owners.
+     */
+    public List<List<ClusterNode>> cachePartitionOwners(String cacheName);
+
+    /**
+     * @param node Node to kill.
+     */
+    public void killNode(ClusterNode node);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java
new file mode 100644
index 0000000..d1c6f27
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationProblemResolver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+/**
+ *
+ */
+public interface CommunicationProblemResolver {
+    /**
+     * @param ctx Context.
+     */
+    public void resolve(CommunicationProblemContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index fc1fb6b..8c3c818 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -480,6 +480,9 @@ public class IgniteConfiguration {
     /** Client connector configuration. */
     private ClientConnectorConfiguration cliConnCfg = 
ClientListenerProcessor.DFLT_CLI_CFG;
 
+    /** */
+    private CommunicationProblemResolver commProblemRslvr;
+
     /**
      * Creates valid grid configuration with all default values.
      */
@@ -507,6 +510,8 @@ public class IgniteConfiguration {
         loadBalancingSpi = cfg.getLoadBalancingSpi();
         indexingSpi = cfg.getIndexingSpi();
 
+        commProblemRslvr = cfg.getCommunicationProblemResolver();
+
         /*
          * Order alphabetically for maintenance purposes.
          */
@@ -590,6 +595,14 @@ public class IgniteConfiguration {
         warmupClos = cfg.getWarmupClosure();
     }
 
+    public CommunicationProblemResolver getCommunicationProblemResolver() {
+        return commProblemRslvr;
+    }
+
+    public void setCommunicationProblemResolver(CommunicationProblemResolver 
commProblemRslvr) {
+        this.commProblemRslvr = commProblemRslvr;
+    }
+
     /**
      * Gets optional grid name. Returns {@code null} if non-default grid name 
was not
      * provided.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 1d595d2..14d55d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -485,6 +485,14 @@ public class GridNioServer<T> {
         return fut;
     }
 
+    public void closeFromWorkerThread(GridNioSession ses) {
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
+
+        GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+
+        
((AbstractNioClientWorker)ses0.worker()).close((GridSelectorNioSessionImpl)ses, 
null);
+    }
+
     /**
      * @param ses Session.
      * @param msg Message.
@@ -834,7 +842,7 @@ public class GridNioServer<T> {
                 NioOperationFuture<GridNioSession> req = new 
NioOperationFuture<>(ch, false, meta);
 
                 if (async) {
-                    assert meta != null;
+                    // assert meta != null;
 
                     req.op = NioOperation.CONNECT;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 0b6daa3..e815312 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -32,6 +32,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.AbstractInterruptibleChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -81,6 +82,7 @@ import 
org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
 import org.apache.ignite.internal.util.nio.GridDirectParser;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
 import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
@@ -2564,6 +2566,53 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
         sendMessage0(node, msg, null);
     }
 
+    public IgniteFuture<BitSet> pingNodes(List<ClusterNode> nodes) {
+        ClusterNode node = nodes.get(0);
+
+        try {
+            LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
+
+            // /172.25.4.90:45012
+
+            for (InetSocketAddress addr : addrs) {
+                SocketChannel ch = SocketChannel.open();
+
+                ch.configureBlocking(false);
+
+                ch.socket().setTcpNoDelay(tcpNoDelay);
+                ch.socket().setKeepAlive(true);
+
+                boolean connect = ch.connect(addr);
+
+                if (!connect) {
+                    GridNioFuture<GridNioSession> fut = 
nioSrvr.createSession(ch, null, true, new 
IgniteInClosure<IgniteInternalFuture<GridNioSession>>() {
+                        @Override public void 
apply(IgniteInternalFuture<GridNioSession> fut) {
+                            try {
+                                GridNioSession ses = fut.get();
+
+                                log.info("Ping connected");
+
+                                nioSrvr.closeFromWorkerThread(ses);
+                            }
+                            catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    });
+
+                    fut.get();
+                }
+                else
+                    log.info("Connected");
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException(e);
+        }
+
+        return null;
+    }
+
     /**
      * Sends given message to destination node. Note that characteristics of 
the
      * exchange such as durability, guaranteed delivery or error notification 
is

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
deleted file mode 100644
index 71673f1..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemContext.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery;
-
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.spi.communication.CommunicationSpi;
-
-/**
- *
- */
-public interface CommunicationProblemContext {
-    /**
-     * @return Current topology snapshot.
-     */
-    public List<ClusterNode> topologySnapshot();
-
-    /**
-     * @param node1 First node.
-     * @param node2 Second node.
-     * @return {@code True} if {@link CommunicationSpi} is able to establish 
connection from first node to second node.
-     */
-    public boolean connectionAvailable(ClusterNode node1, ClusterNode node2);
-
-    /**
-     * @return List of currently started cache.
-     */
-    public List<String> startedCaches();
-
-    /**
-     * @param cacheName Cache name.
-     * @return Cache partitions affinity assignment.
-     */
-    public List<List<ClusterNode>> cacheAffinity(String cacheName);
-
-    /**
-     * @param cacheName Cache name.
-     * @return Cache partitions owners.
-     */
-    public List<List<ClusterNode>> cachePartitionOwners(String cacheName);
-
-    /**
-     * @param node Node to kill.
-     */
-    public void killNode(ClusterNode node);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
deleted file mode 100644
index a9b620b..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/CommunicationProblemResolver.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery;
-
-/**
- *
- */
-public interface CommunicationProblemResolver {
-    /**
-     * @param ctx Context.
-     */
-    public void resolve(CommunicationProblemContext ctx);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
index 4d0262d..b2d4bf0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DefaultCommunicationProblemResolver.java
@@ -20,6 +20,8 @@ package org.apache.ignite.spi.discovery;
 import java.util.BitSet;
 import java.util.List;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CommunicationProblemContext;
+import org.apache.ignite.configuration.CommunicationProblemResolver;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index 15744a2..6812ab0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -30,11 +31,13 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * Future is created on each node when either connection error occurs or 
resolve communication error request
+ * received.
  */
 class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> 
implements IgniteSpiTimeoutObject, Runnable {
     /** */
@@ -144,8 +147,12 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
      * @param nodes Nodes to ping.
      * @throws Exception If failed.
      */
-    void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, 
String futPath, Collection<ClusterNode> nodes)
+    void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, 
String futPath, List<ClusterNode> nodes)
         throws Exception {
+        TcpCommunicationSpi spi = 
(TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
+
+        spi.pingNodes(nodes);
+
         ZkDistributedCollectDataFuture.saveNodeResult(futPath, 
rtState.zkClient, locNodeOrder, null);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
new file mode 100644
index 0000000..fd11b55
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CommunicationProblemContext;
+
+/**
+ *
+ */
+class ZkCommunicationProblemContext implements CommunicationProblemContext {
+    /** */
+    private Set<ClusterNode> killedNodes = new HashSet<>();
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> topologySnapshot() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean connectionAvailable(ClusterNode node1, 
ClusterNode node2) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<String> startedCaches() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> cachePartitionOwners(String 
cacheName) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void killNode(ClusterNode node) {
+        if (node == null)
+            throw new NullPointerException();
+
+        killedNodes.add(node);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
index d33001b..e5d2356 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -78,8 +78,10 @@ class ZkDistributedCollectDataFuture extends 
GridFutureAdapter<Void> {
         if (remainingNodes.isEmpty())
             completeAndNotifyListener();
         else {
-            if (log.isInfoEnabled())
-                log.info("Initialize data collect future [futPath=" + futPath 
+ ", nodes=" + remainingNodes.size() + ']');
+            if (log.isInfoEnabled()) {
+                log.info("Initialize data collect future [futPath=" + futPath 
+ ", " +
+                    "remainingNodes=" + remainingNodes.size() + ']');
+            }
 
             rtState.zkClient.getChildrenAsync(futPath, watcher, watcher);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 82d9c4b..65bf6e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -41,6 +41,7 @@ import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CommunicationProblemResolver;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -1043,6 +1044,22 @@ public class ZookeeperDiscoveryImpl {
             assert locNode.order() > 0 : locNode;
             assert rtState.evtsData != null;
 
+            UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
+
+            if (futId != null) {
+                if (log.isInfoEnabled()) {
+                    log.info("New discovery coordinator will handle already 
started cluster-wide communication " +
+                        "error resolve [reqId=" + futId + ']');
+                }
+
+                ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
+
+                ZkDistributedCollectDataFuture collectResFut = 
collectCommunicationStatusFuture(futId);
+
+                if (fut != null)
+                    fut.nodeResultCollectFuture(collectResFut);
+            }
+
             for (ZkDiscoveryEventData evtData : rtState.evtsData.evts.values())
                 evtData.initRemainingAcks(rtState.top.nodesByOrder.values());
 
@@ -2106,16 +2123,7 @@ public class ZookeeperDiscoveryImpl {
         final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
         if (rtState.crd) {
-            ZkDistributedCollectDataFuture nodeResFut = new 
ZkDistributedCollectDataFuture(this, rtState, futPath,
-                new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        // Future is completed from ZK event thread.
-                        finishCommunicationResolveProcess(rtState);
-
-                        return null;
-                    }
-                }
-            );
+            ZkDistributedCollectDataFuture nodeResFut = 
collectCommunicationStatusFuture(msg.id);
 
             fut.nodeResultCollectFuture(nodeResFut);
         }
@@ -2128,14 +2136,35 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param futId Future ID.
+     * @return Future.
+     * @throws Exception If failed.
+     */
+    private ZkDistributedCollectDataFuture 
collectCommunicationStatusFuture(UUID futId) throws Exception {
+        return new ZkDistributedCollectDataFuture(this, rtState, 
zkPaths.distributedFutureBasePath(futId),
+            new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    // Future is completed from ZK event thread.
+                    onCommunicationResolveStatusReceived(rtState);
+
+                    return null;
+                }
+            }
+        );
+    }
+
+    /**
      * @param rtState Runtime state.
      * @throws Exception If failed.
      */
-    private void finishCommunicationResolveProcess(ZkRuntimeState rtState) 
throws Exception {
+    private void onCommunicationResolveStatusReceived(ZkRuntimeState rtState) 
throws Exception {
         ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
         UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
 
+        if (log.isInfoEnabled())
+            log.info("Received communication status from all nodes, call 
resolver [reqId=" + futId + ']');
+
         assert futId != null;
 
         ZkCommunicationErrorResolveFinishMessage msg = new 
ZkCommunicationErrorResolveFinishMessage(futId);
@@ -2148,6 +2177,14 @@ public class ZookeeperDiscoveryImpl {
             rtState.zkClient,
             marshalZip(res));
 
+        CommunicationProblemResolver rslvr = 
spi.ignite().configuration().getCommunicationProblemResolver();
+
+        if (rslvr != null) {
+            ZkCommunicationProblemContext ctx = new 
ZkCommunicationProblemContext();
+
+            rslvr.resolve(ctx);
+        }
+
         evtsData.evtIdGen++;
 
         ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(

http://git-wip-us.apache.org/repos/asf/ignite/blob/cbe9980b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index ec70be6..64fcd34 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.io.File;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -66,13 +67,14 @@ import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.logger.java.JavaLogger;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.CommunicationProblemContext;
-import org.apache.ignite.spi.discovery.CommunicationProblemResolver;
+import org.apache.ignite.configuration.CommunicationProblemContext;
+import org.apache.ignite.configuration.CommunicationProblemResolver;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -121,6 +123,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     private boolean testSockNio;
 
     /** */
+    private boolean testCommSpi;
+
+    /** */
     private int sesTimeout;
 
     /** */
@@ -139,7 +144,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     private boolean persistence;
 
     /** */
-    private CommunicationProblemResolver communicationProblemResolver;
+    private CommunicationProblemResolver commProblemRslvr;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
@@ -236,6 +241,12 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             cfg.setDataStorageConfiguration(memCfg);
         }
 
+        if (testCommSpi)
+            cfg.setCommunicationSpi(new ZkTestCommunicationSpi());
+
+        if (commProblemRslvr != null)
+            cfg.setCommunicationProblemResolver(commProblemRslvr);
+
         return cfg;
     }
 
@@ -1792,7 +1803,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         assert nodes > 1;
 
         sesTimeout = 2000;
-        communicationProblemResolver = new NoOpCommunicationProblemResolver();
+        commProblemRslvr = new NoOpCommunicationProblemResolver();
 
         startGridsMultiThreaded(nodes);
 
@@ -1824,7 +1835,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     public void testNoOpCommunicationErrorResolve_3() throws Exception {
         // One node fails before sending communication status.
         sesTimeout = 2000;
-        communicationProblemResolver = new NoOpCommunicationProblemResolver();
+        commProblemRslvr = new NoOpCommunicationProblemResolver();
 
         startGridsMultiThreaded(3);
 
@@ -1864,6 +1875,64 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_4() throws Exception {
+        // Coordinator changes while resolve process is in progress.
+        testCommSpi = true;
+
+        sesTimeout = 2000;
+        commProblemRslvr = new NoOpCommunicationProblemResolver();
+
+        startGrid(0);
+
+        startGridsMultiThreaded(1, 3);
+
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.forNode(ignite(3));
+
+        commSpi.pingLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() {
+                ZookeeperDiscoverySpi spi = spi(ignite(1));
+
+                
spi.onCommunicationConnectionError(ignite(2).cluster().localNode(), new 
Exception("test"));
+
+                return null;
+            }
+        });
+
+        U.sleep(1000);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(0);
+
+        commSpi.pingLatch.countDown();
+
+        fut.get();
+
+        waitForTopology(3);
+    }
+
+    /**
+     * TODO ZK: move to comm spi tests.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNodesPing() throws Exception {
+       startGrids(3);
+
+       TcpCommunicationSpi spi = 
(TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi();
+
+       List<ClusterNode> nodes = new ArrayList<>();
+
+       nodes.add(ignite(2).cluster().localNode());
+
+       spi.pingNodes(nodes);
+    }
+
+    /**
      * @param dfltConsistenId Default consistent ID flag.
      * @throws Exception If failed.
      */
@@ -2298,6 +2367,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         }, 10000));
     }
 
+    /**
+     * @param node Node.
+     */
     private static void closeZkClient(Ignite node) {
         DiscoverySpi spi = node.configuration().getDiscoverySpi();
 
@@ -2322,6 +2394,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
     /**
      * @param spi Spi instance.
+     * @return Zookeeper client.
      */
     private static ZooKeeper zkClient(ZookeeperDiscoverySpi spi) {
         return GridTestUtils.getFieldValue(spi, "impl", "rtState", "zkClient", 
"zk");
@@ -2333,6 +2406,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @param log  Logger.
      * @param clients Clients.
      * @param disconnectedC Closure which will be run when client node 
disconnected.
+     * @param closeSock {@code True} to simulate reconnect by closing zk 
client's socket.
      * @throws Exception If failed.
      */
     public static void reconnectClientNodes(final IgniteLogger log,
@@ -2449,7 +2523,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @param latch Latch.
      * @throws Exception If failed.
      */
-    protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch 
latch) throws Exception {
+    private static void waitReconnectEvent(IgniteLogger log, CountDownLatch 
latch) throws Exception {
         if (!latch.await(30_000, MILLISECONDS)) {
             log.error("Failed to wait for reconnect event, will dump threads, 
latch count: " + latch.getCount());
 
@@ -2475,6 +2549,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      *
      */
+    @SuppressWarnings("MismatchedReadAndWriteOfArray")
     static class TestAffinityFunction extends RendezvousAffinityFunction {
         /** */
         private static final long serialVersionUID = 0L;
@@ -2536,4 +2611,35 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             // No-op.
         }
     }
+
+    /**
+     *
+     */
+    static class ZkTestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private volatile CountDownLatch pingLatch;
+
+        /**
+         * @param ignite Node.
+         * @return Node's communication SPI.
+         */
+        static ZkTestCommunicationSpi forNode(Ignite ignite) {
+            return 
(ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<BitSet> pingNodes(List<ClusterNode> 
nodes) {
+            CountDownLatch pingLatch = this.pingLatch;
+
+            try {
+                if (pingLatch != null)
+                    pingLatch.await();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteException(e);
+            }
+
+            return super.pingNodes(nodes);
+        }
+    }
 }

Reply via email to