Repository: ignite
Updated Branches:
  refs/heads/ignite-zk-ce fc085a4a0 -> 9e4f2c20c


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e4f2c20
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e4f2c20
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e4f2c20

Branch: refs/heads/ignite-zk-ce
Commit: 9e4f2c20c2582655637c5abf349abc9e5473e9fe
Parents: fc085a4
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Dec 18 15:25:34 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Dec 18 18:08:47 2017 +0300

----------------------------------------------------------------------
 .../internal/ZkCommunicationErrorNodeState.java |   8 +-
 .../ZkCommunicationErrorProcessFuture.java      |  28 ++
 ...kCommunicationErrorResolveFinishMessage.java |   7 +-
 .../ZkCommunicationErrorResolveResult.java      |  15 +-
 .../internal/ZkCommunicationProblemContext.java |  67 +++-
 .../zk/internal/ZkDiscoveryEventsData.java      |   2 +-
 .../ZkDistributedCollectDataFuture.java         |  11 +-
 .../discovery/zk/internal/ZookeeperClient.java  |   2 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 359 +++++++++++++++----
 .../ZookeeperDiscoverySpiBasicTest.java         | 138 ++++++-
 10 files changed, 540 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
index ddc310d..9c21f13 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
@@ -23,21 +23,23 @@ import java.util.BitSet;
 /**
  *
  */
-public class ZkCommunicationErrorNodeState implements Serializable {
+class ZkCommunicationErrorNodeState implements Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    private final BitSet commState;
+    final BitSet commState;
 
     /** */
-    private final Exception err;
+    final Exception err;
 
     /**
      * @param commState Communication state.
      * @param err Error if failed get communication state..
      */
     ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
+        assert commState != null || err != null;
+
         this.commState = commState;
         this.err = err;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index a6294bd..0074817 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -214,6 +214,34 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
     }
 
     /**
+     * @param err Error.
+     */
+    void onError(Exception err) {
+        assert err != null;
+
+        Map<Long, GridFutureAdapter<Boolean>> futs;
+
+        synchronized (this) {
+            if (state == State.DONE) {
+                assert resErr != null;
+
+                return;
+            }
+
+            state = State.DONE;
+
+            resErr = err;
+
+            futs = nodeFuts; // nodeFuts should not be modified after state 
changed to DONE.
+        }
+
+        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet())
+            e.getValue().onDone(err);
+
+        onDone(err);
+    }
+
+    /**
      * @param failedNodes Node failed as result of resolve process.
      */
     void onFinishResolve(Set<Long> failedNodes) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 20aeddf..147b78f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -33,13 +33,18 @@ class ZkCommunicationErrorResolveFinishMessage implements 
DiscoverySpiCustomMess
     final UUID futId;
 
     /** */
+    final long topVer;
+
+    /** */
     transient ZkCommunicationErrorResolveResult res;
 
     /**
      * @param futId Future ID.
+     * @param topVer Topology version when resolve process finished.
      */
-    ZkCommunicationErrorResolveFinishMessage(UUID futId) {
+    ZkCommunicationErrorResolveFinishMessage(UUID futId, long topVer) {
         this.futId = futId;
+        this.topVer = topVer;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
index 607f93b..68cbdb8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.Serializable;
-import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridIntList;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -29,12 +29,17 @@ class ZkCommunicationErrorResolveResult implements 
Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
-    final GridLongList failedNodes;
+    final GridIntList killedNodes;
+
+    /** */
+    final Exception err;
 
     /**
-     * @param failedNodes
+     * @param killedNodes Killed nodes.
+     * @param err Error.
      */
-    ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) {
-        this.failedNodes = failedNodes;
+    ZkCommunicationErrorResolveResult(@Nullable GridIntList killedNodes, 
Exception err) {
+        this.killedNodes = killedNodes;
+        this.err = err;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
index fd11b55..9cb48cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java
@@ -17,9 +17,14 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CommunicationProblemContext;
 
@@ -28,31 +33,73 @@ import 
org.apache.ignite.configuration.CommunicationProblemContext;
  */
 class ZkCommunicationProblemContext implements CommunicationProblemContext {
     /** */
+    private static final Comparator<ClusterNode> NODE_ORDER_CMP = new 
Comparator<ClusterNode>() {
+        @Override public int compare(ClusterNode node1, ClusterNode node2) {
+            return Long.compare(node1.order(), node2.order());
+        }
+    };
+
+    /** */
     private Set<ClusterNode> killedNodes = new HashSet<>();
 
+    /** */
+    private final Map<UUID, BitSet> nodesState;
+
+    /** */
+    private final List<ClusterNode> initialNodes;
+
+    /** */
+    private final List<ClusterNode> curNodes;
+
+    /**
+     * @param curNodes Current topology snapshot.
+     * @param initialNodes Topology snapshot when communication error resolve 
started.
+     * @param nodesState Nodes communication state.
+     */
+    ZkCommunicationProblemContext(List<ClusterNode> curNodes,
+        List<ClusterNode> initialNodes,
+        Map<UUID, BitSet> nodesState)
+    {
+        this.curNodes = Collections.unmodifiableList(curNodes);
+        this.initialNodes = initialNodes;
+        this.nodesState = nodesState;
+    }
+
     /** {@inheritDoc} */
     @Override public List<ClusterNode> topologySnapshot() {
-        return null;
+        return curNodes;
     }
 
     /** {@inheritDoc} */
     @Override public boolean connectionAvailable(ClusterNode node1, 
ClusterNode node2) {
-        return false;
+        BitSet nodeState = nodesState.get(node1.id());
+
+        if (nodeState == null)
+            throw new IllegalArgumentException("Invalid node: " + node1);
+
+        int nodeIdx = Collections.binarySearch(initialNodes, node2, 
NODE_ORDER_CMP);
+
+        if (nodeIdx < 0)
+            throw new IllegalArgumentException("Invalid node: " + node2);
+
+        assert nodeIdx < nodeState.size() : nodeIdx;
+
+        return nodeState.get(nodeIdx);
     }
 
     /** {@inheritDoc} */
     @Override public List<String> startedCaches() {
-        return null;
+        return null; // TODO ZK
     }
 
     /** {@inheritDoc} */
     @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) {
-        return null;
+        return null; // TODO ZK
     }
 
     /** {@inheritDoc} */
     @Override public List<List<ClusterNode>> cachePartitionOwners(String 
cacheName) {
-        return null;
+        return null; // TODO ZK
     }
 
     /** {@inheritDoc} */
@@ -60,6 +107,16 @@ class ZkCommunicationProblemContext implements 
CommunicationProblemContext {
         if (node == null)
             throw new NullPointerException();
 
+        if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0)
+            throw new IllegalArgumentException("Invalid node: " + node);
+
         killedNodes.add(node);
     }
+
+    /**
+     * @return Nodes to fail.
+     */
+    Set<ClusterNode> killedNodes() {
+        return killedNodes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index faea49e..6bdf573 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -69,7 +69,7 @@ class ZkDiscoveryEventsData implements Serializable {
     /**
      * @param id Future ID.
      */
-     void communicationErrorResolveFutureId(UUID id) {
+     void communicationErrorResolveFutureId(@Nullable UUID id) {
         commErrFutId = id;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
index 19e2acc..2467928 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -141,9 +141,14 @@ class ZkDistributedCollectDataFuture extends 
GridFutureAdapter<Void> {
         // TODO ZK: use multi, better batching + max-size safe + 
NoNodeException safe.
         String evtDir = paths.distributedFutureBasePath(futId);
 
-        client.deleteAll(evtDir,
-            client.getChildren(evtDir),
-            -1);
+        try {
+            client.deleteAll(evtDir,
+                client.getChildren(evtDir),
+                -1);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // TODO ZK
+        }
 
         client.deleteIfExists(evtDir, -1);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 5923b39..5c0bd58 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -940,7 +940,7 @@ public class ZookeeperClient implements Watcher {
             else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
                 U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
             else
-                assert rc == 0 : rc;
+                assert rc == 0 : KeeperException.Code.get(rc);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 62fc581..ef3504f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -50,6 +51,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -246,6 +248,8 @@ public class ZookeeperDiscoveryImpl {
      * @param err Connect error.
      */
     public void onCommunicationConnectionError(ClusterNode node0, Exception 
err) {
+        checkState();
+
         ZookeeperClusterNode node = node(node0.id());
 
         if (node == null)
@@ -269,7 +273,16 @@ public class ZookeeperDiscoveryImpl {
                             ", err= " + err + ']');
                     }
 
-                    fut.scheduleCheckOnTimeout();
+                    ConnectionState connState;
+
+                    synchronized (this) {
+                        connState = this.connState;
+                    }
+
+                    if (connState != ConnectionState.STARTED)
+                        fut.onError(new IgniteCheckedException("Node 
stopped."));
+                    else
+                        fut.scheduleCheckOnTimeout();
                 }
                 else
                     fut = commErrProcFut.get();
@@ -475,7 +488,7 @@ public class ZookeeperDiscoveryImpl {
                 break;
 
             case STOPPED:
-                throw new IgniteSpiException("Zookeeper client closed.");
+                throw new IgniteSpiException("Node stopped.");
 
             case DISCONNECTED:
                 throw new IgniteClientDisconnectedException(null, "Client is 
disconnected.");
@@ -1025,11 +1038,41 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param lastEvts Last events from previous coordinator.
+     * @throws Exception If failed.
+     */
+    private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) 
throws Exception {
+        for (ZkDiscoveryEventData evtData : lastEvts.evts.values()) {
+            if (evtData instanceof ZkDiscoveryCustomEventData) {
+                ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
+
+                // It is possible previous coordinator failed before finished 
message processing.
+                if (evtData0.msg instanceof 
ZkCommunicationErrorResolveFinishMessage) {
+                    try {
+                        ZkCommunicationErrorResolveFinishMessage msg =
+                            
(ZkCommunicationErrorResolveFinishMessage)evtData0.msg;
+
+                        ZkCommunicationErrorResolveResult res = unmarshalZip(
+                            
ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, 
msg.futId));
+
+                        deleteAliveNodes(res.killedNodes);
+                    }
+                    catch (KeeperException.NoNodeException ignore) {
+                        // No-op.
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * @param aliveNodes Alive nodes paths.
      * @param locInternalId Local node's internal ID.
      * @throws Exception If failed.
      */
     private void onBecomeCoordinator(List<String> aliveNodes, int 
locInternalId) throws Exception {
+        long topVer0 = rtState.evtsData != null ? rtState.evtsData.topVer : 
-1L;
+
         byte[] evtsDataBytes = rtState.zkClient.getData(zkPaths.evtsPath);
 
         if (evtsDataBytes.length > 0)
@@ -1044,6 +1087,11 @@ public class ZookeeperDiscoveryImpl {
             assert locNode.order() > 0 : locNode;
             assert rtState.evtsData != null;
 
+            previousCoordinatorCleanup(rtState.evtsData);
+
+            if (topVer0 > rtState.evtsData.topVer)
+                rtState.evtsData.topVer = topVer0;
+
             UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
 
             if (futId != null) {
@@ -1636,6 +1684,7 @@ public class ZookeeperDiscoveryImpl {
 
                                 assert node != null :  msg0.nodeId;
 
+                                // TODO ZK: delete when process event
                                 for (String child : 
zkClient.getChildren(zkPaths.aliveNodesDir)) {
                                     if (ZkIgnitePaths.aliveInternalId(child) 
== node.internalId()) {
                                         
zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
@@ -2021,22 +2070,25 @@ public class ZookeeperDiscoveryImpl {
         ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId);
 
         if (msg.warning != null) {
-            U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
-                "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
-                ", nodeId=" + msg.nodeId +
-                ", msg=" + msg.warning + ']');
+            U.warn(log, "Received force EVT_NODE_FAILED event with warning [" +
+                "nodeId=" + msg.nodeId +
+                ", msg=" + msg.warning +
+                ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) + ']');
         }
         else {
             U.warn(log, "Received force EVT_NODE_FAILED event [" +
-                "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
-                ", nodeId=" + msg.nodeId + ']');
+                "nodeId=" + msg.nodeId +
+                ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) + ']');
         }
 
         ZookeeperClusterNode node = rtState.top.nodesById.get(msg.nodeId);
 
         assert node != null : msg.nodeId;
 
-        processNodeFail(node.internalId(), evtData.topologyVersion());
+        if (node.isLocal())
+            throw localNodeFail("Received force EVT_NODE_FAILED event for 
local node.");
+        else
+            notifyNodeFail(node.internalId(), evtData.topologyVersion());
     }
 
     /**
@@ -2049,13 +2101,10 @@ public class ZookeeperDiscoveryImpl {
         UUID futId = msg.futId;
 
         assert futId != null;
-        assert 
futId.equals(rtState.evtsData.communicationErrorResolveFutureId());
 
         if (log.isInfoEnabled())
             log.info("Received communication error resolve finish message 
[reqId=" + futId + ']');
 
-        rtState.evtsData.communicationErrorResolveFutureId(null);
-
         rtState.commErrProcNodes = null;
 
         ZkCommunicationErrorResolveResult res = msg.res;
@@ -2069,14 +2118,72 @@ public class ZookeeperDiscoveryImpl {
 
         Set<Long> failedNodes = null;
 
-        if (res.failedNodes != null) {
-            failedNodes = U.newHashSet(res.failedNodes.size());
+        if (res.err != null)
+            U.error(log, "Communication error resolve failed: " + res.err, 
res.err);
+        else {
+            if (res.killedNodes != null) {
+                failedNodes = U.newHashSet(res.killedNodes.size());
+
+                for (int i = 0; i < res.killedNodes.size(); i++) {
+                    int internalId = res.killedNodes.get(i);
+
+                    if (internalId == locNode.internalId()) {
+                        fut.onError(new IgniteCheckedException("Local node is 
forced to stop " +
+                            "by communication error resolver"));
+
+                        if (rtState.crd)
+                            deleteAliveNodes(res.killedNodes);
+
+                        throw localNodeFail("Local node is forced to stop by 
communication error resolver " +
+                            "[nodeId=" + locNode.id() + ']');
+                    }
+
+                    ZookeeperClusterNode node = 
rtState.top.nodesByInternalId.get(internalId);
+
+                    assert node != null : internalId;
+
+                    failedNodes.add(node.order());
+                }
+
+                long topVer = msg.topVer;
+
+                for (int i = 0; i < res.killedNodes.size(); i++) {
+                    int nodeInternalId = res.killedNodes.get(i);
 
-            for (int i = 0; i < res.failedNodes.size(); i++)
-                failedNodes.add(res.failedNodes.get(i));
+                    ClusterNode node = 
rtState.top.nodesByInternalId.get(nodeInternalId);
+
+                    assert node != null : nodeInternalId;
+
+                    if (log.isInfoEnabled())
+                        log.info("Node stop is forced by communication error 
resolver [nodeId=" + node.id() + ']');
+
+                    notifyNodeFail(nodeInternalId, ++topVer);
+                }
+            }
         }
 
         fut.onFinishResolve(failedNodes);
+
+        if (rtState.crd)
+            deleteAliveNodes(res.killedNodes);
+    }
+
+    /**
+     * @param internalIds Nodes internal IDs.
+     * @throws Exception If failed.
+     */
+    private void deleteAliveNodes(@Nullable GridIntList internalIds) throws 
Exception {
+        if (internalIds == null)
+            return;
+
+        List<String> alives = 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir);
+
+        for (int i = 0; i < alives.size(); i++) {
+            String alive = alives.get(i);
+
+            if (internalIds.contains(ZkIgnitePaths.aliveInternalId(alive)))
+                rtState.zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + 
"/" + alive);
+        }
     }
 
     /**
@@ -2163,7 +2270,7 @@ public class ZookeeperDiscoveryImpl {
     private void onCommunicationErrorResolveStatusReceived(ZkRuntimeState 
rtState) throws Exception {
         ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
-        UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
+        UUID futId = evtsData.communicationErrorResolveFutureId();
 
         if (log.isInfoEnabled())
             log.info("Received communication status from all nodes [reqId=" + 
futId + ']');
@@ -2178,43 +2285,113 @@ public class ZookeeperDiscoveryImpl {
 
         rtState.commErrProcNodes = null;
 
-        ZkClusterNodes top = rtState.top;
+        List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
-        List<ZkCommunicationErrorNodeState> nodesRes = new ArrayList<>();
+        Map<UUID, BitSet> nodesRes = U.newHashMap(topSnapshot.size());
 
-        for (ZookeeperClusterNode node : top.nodesByOrder.values()) {
+        Exception err = null;
+
+        for (ClusterNode node : topSnapshot) {
             byte[] stateBytes = 
ZkDistributedCollectDataFuture.readNodeResult(futPath,
                 rtState.zkClient,
                 node.order());
 
             ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes);
 
-            nodesRes.add(nodeState);
+            if (nodeState.err != null) {
+                if (err == null)
+                    err = new Exception("Failed to resolve communication 
error.");
+
+                err.addSuppressed(nodeState.err);
+            }
+            else {
+                assert nodeState.commState != null;
+
+                nodesRes.put(node.id(), nodeState.commState);
+            }
         }
 
-        ZkCommunicationErrorResolveFinishMessage msg = new 
ZkCommunicationErrorResolveFinishMessage(futId);
+        long topVer = evtsData.topVer;
 
-        ZkCommunicationErrorResolveResult res = new 
ZkCommunicationErrorResolveResult(null);
+        GridIntList killedNodesList = null;
 
-        msg.res = res;
+        if (err == null) {
+            boolean fullyConnected = true;
 
-        
ZkDistributedCollectDataFuture.saveResult(zkPaths.distributedFutureResultPath(futId),
-            rtState.zkClient,
-            marshalZip(res));
+            for (Map.Entry<UUID, BitSet> e : nodesRes.entrySet()) {
+                if (!checkFullyConnected(e.getValue(), initialNodes, 
rtState.top)) {
+                    fullyConnected = false;
+
+                    break;
+                }
+            }
 
-        CommunicationProblemResolver rslvr = 
spi.ignite().configuration().getCommunicationProblemResolver();
+            if (fullyConnected) {
+                if (log.isInfoEnabled()) {
+                    log.info("Finish communication error resolve process 
automatically, there are no " +
+                        "communication errors [reqId=" + futId + ']');
+                }
+            }
+            else {
+                CommunicationProblemResolver rslvr = 
spi.ignite().configuration().getCommunicationProblemResolver();
+
+                if (rslvr != null) {
+                    if (log.isInfoEnabled()) {
+                        log.info("Call communication error resolver [reqId=" + 
futId +
+                            ", rslvr=" + rslvr.getClass().getSimpleName() + 
']');
+                    }
 
-        if (rslvr != null) {
-            ZkCommunicationProblemContext ctx = new 
ZkCommunicationProblemContext();
+                    ZkCommunicationProblemContext ctx = new 
ZkCommunicationProblemContext(topSnapshot,
+                        initialNodes,
+                        nodesRes);
 
-            rslvr.resolve(ctx);
+                    try {
+                        rslvr.resolve(ctx);
+
+                        Set<ClusterNode> killedNodes = ctx.killedNodes();
+
+                        if (killedNodes != null) {
+                            if (log.isInfoEnabled()) {
+                                log.info("Communication error resolver forces 
nodes stop [reqId=" + futId +
+                                    ", killNodeCnt=" + killedNodes.size() +
+                                    ", nodeIds=" + U.nodeIds(killedNodes) + 
']');
+                            }
+
+                            killedNodesList = new 
GridIntList(killedNodes.size());
+
+                            for (ClusterNode killedNode : killedNodes) {
+                                
killedNodesList.add(((ZookeeperClusterNode)killedNode).internalId());
+
+                                evtsData.topVer++;
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        err = e;
+
+                        U.error(log, "Failed to resolve communication error 
with configured resolver [reqId=" + futId + ']', e);
+                    }
+                }
+            }
         }
 
+        evtsData.communicationErrorResolveFutureId(null);
+
+        ZkCommunicationErrorResolveResult res = new 
ZkCommunicationErrorResolveResult(killedNodesList, err);
+
+        ZkCommunicationErrorResolveFinishMessage msg = new 
ZkCommunicationErrorResolveFinishMessage(futId, topVer);
+
+        msg.res = res;
+
+        
ZkDistributedCollectDataFuture.saveResult(zkPaths.distributedFutureResultPath(futId),
+            rtState.zkClient,
+            marshalZip(res));
+
         evtsData.evtIdGen++;
 
         ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
             evtsData.evtIdGen,
-            evtsData.topVer,
+            topVer,
             locNode.id(),
             msg,
             null,
@@ -2228,6 +2405,30 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param commState Node communication state.
+     * @param initialNodes Topology snapshot when communication error resolve 
started.
+     * @param top Current topology.
+     * @return {@code True} if node has connection to all alive nodes.
+     */
+    private boolean checkFullyConnected(BitSet commState, List<ClusterNode> 
initialNodes, ZkClusterNodes top) {
+        int startIdx = 0;
+
+        for (;;) {
+            int idx = commState.nextClearBit(startIdx);
+
+            if (idx >= initialNodes.size())
+                return true;
+
+            ClusterNode node = initialNodes.get(idx);
+
+            if (top.nodesById.containsKey(node.id()))
+                return false;
+
+            startIdx = idx + 1;
+        }
+    }
+
+    /**
      *
      */
     public void simulateNodeFailure() {
@@ -2292,70 +2493,73 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param evtData Event data.
-     * @throws Exception If failed.
      */
-    private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) 
throws Exception {
-        processNodeFail(evtData.failedNodeInternalId(), 
evtData.topologyVersion());
+    private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) {
+        notifyNodeFail(evtData.failedNodeInternalId(), 
evtData.topologyVersion());
     }
 
     /**
-     * @param nodeInternalId Failed node internal ID.
+     * @param nodeInternalOrder Node order.
      * @param topVer Topology version.
-     * @throws Exception If failed.
      */
-    private void processNodeFail(int nodeInternalId, long topVer) throws 
Exception {
-        final ZookeeperClusterNode failedNode = 
rtState.top.removeNode(nodeInternalId);
+    private void notifyNodeFail(int nodeInternalOrder, long topVer) {
+        final ZookeeperClusterNode failedNode = 
rtState.top.removeNode(nodeInternalOrder);
 
-        assert failedNode != null;
+        assert failedNode != null && !failedNode.isLocal() : failedNode;
 
-        if (failedNode.isLocal()) {
-            U.warn(log, "Received EVT_NODE_FAILED for local node.");
+        PingFuture pingFut = pingFuts.get(failedNode.order());
 
-            rtState.onCloseStart();
+        if (pingFut != null)
+            pingFut.onDone(false);
 
-            if (locNode.isClient() && clientReconnectEnabled) {
-                boolean reconnect = false;
+        final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
-                synchronized (stateMux) {
-                    if (connState == ConnectionState.STARTED) {
-                        reconnect = true;
+        lsnr.onDiscovery(EVT_NODE_FAILED,
+            topVer,
+            failedNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+    }
 
-                        connState = ConnectionState.DISCONNECTED;
-                    }
-                }
+    /**
+     * @param msg Message to log.
+     * @return Exception to be thrown.
+     */
+    private Exception localNodeFail(String msg) {
+        U.warn(log, msg);
+
+        rtState.onCloseStart();
 
-                if (reconnect) {
-                    UUID newId = UUID.randomUUID();
+        if (clientReconnectEnabled) {
+            assert locNode.isClient() : locNode;
 
-                    U.quietAndWarn(log, "Received EVT_NODE_FAILED for local 
node, will try to reconnect with new id [" +
-                        "newId=" + newId +
-                        ", prevId=" + locNode.id() +
-                        ", locNode=" + locNode + ']');
+            boolean reconnect = false;
 
-                    runInWorkerThread(new ReconnectClosure(newId));
+            synchronized (stateMux) {
+                if (connState == ConnectionState.STARTED) {
+                    reconnect = true;
+
+                    connState = ConnectionState.DISCONNECTED;
                 }
             }
-            else
-                notifySegmented();
 
-            // Stop any further processing.
-            throw new ZookeeperClientFailedException("Received node failed 
event for local node.");
-        }
-        else {
-            PingFuture pingFut = pingFuts.get(failedNode.order());
-
-            if (pingFut != null)
-                pingFut.onDone(false);
+            if (reconnect) {
+                UUID newId = UUID.randomUUID();
 
-            final List<ClusterNode> topSnapshot = 
rtState.top.topologySnapshot();
+                U.quietAndWarn(log, "Client node will try to reconnect with 
new id [" +
+                    "newId=" + newId +
+                    ", prevId=" + locNode.id() +
+                    ", locNode=" + locNode + ']');
 
-            lsnr.onDiscovery(EVT_NODE_FAILED,
-                topVer,
-                failedNode,
-                topSnapshot,
-                Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                null);
+                runInWorkerThread(new ReconnectClosure(newId));
+            }
         }
+        else
+            notifySegmented();
+
+        // Stop any further processing.
+        return new ZookeeperClientFailedException(msg);
     }
 
     /**
@@ -2629,6 +2833,11 @@ public class ZookeeperDiscoveryImpl {
         if (zkClient != null)
             zkClient.close();
 
+        ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
+
+        if (commErrFut != null)
+            commErrFut.onError(new IgniteCheckedException("Node stopped."));
+
         IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, 
log);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index cee2e76..a29f478 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.io.File;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -62,6 +64,7 @@ import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import 
org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -1712,6 +1715,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * TODO ZK
+     *
      * @throws Exception If failed.
      */
     public void _testCommunicationFailure() throws Exception {
@@ -1918,6 +1923,97 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCommunicationErrorResolve_KillNode_1() throws Exception {
+        communicationErrorResolve_KillNodes(2, Collections.singletonList(2L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillNode_2() throws Exception {
+        communicationErrorResolve_KillNodes(3, Collections.singletonList(2L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillNode_3() throws Exception {
+        communicationErrorResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L));
+    }
+
+    /**
+     * @param startNodes Number of nodes to start.
+     * @param killNodes Nodes to kill by resolve process.
+     * @throws Exception If failed.
+     */
+    private void communicationErrorResolve_KillNodes(int startNodes, 
Collection<Long> killNodes) throws Exception {
+        testCommSpi = true;
+
+        commProblemRslvr = new 
TestNodeKillCommunicationProblemResolver(killNodes);
+
+        startGrids(startNodes);
+
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.forNode(ignite(0));
+
+        commSpi.checkRes = new BitSet(startNodes);
+
+        ZookeeperDiscoverySpi spi = null;
+
+        for (Ignite node : G.allGrids()) {
+            ZookeeperDiscoverySpi spi0 = spi(node);
+
+            if (!killNodes.contains(node.cluster().localNode().order())) {
+                spi = spi0;
+
+                break;
+            }
+        }
+
+        assertNotNull(spi);
+
+        spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), 
new Exception("test"));
+
+        int expNodes = startNodes - killNodes.size();
+
+        waitForTopology(expNodes);
+
+        for (Ignite node : G.allGrids())
+            
assertFalse(killNodes.contains(node.cluster().localNode().order()));
+
+        startGrid(startNodes);
+
+        waitForTopology(expNodes + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillCoordinator() throws 
Exception {
+        // Kill coordinator.
+        testCommSpi = true;
+
+        commProblemRslvr = new 
TestNodeKillCommunicationProblemResolver(Collections.singleton(1L));
+
+        startGrids(3);
+
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.forNode(ignite(2));
+
+        commSpi.checkRes = new BitSet(3);
+
+        ZookeeperDiscoverySpi spi = spi(ignite(1));
+
+        spi.onCommunicationConnectionError(ignite(0).cluster().localNode(), 
new Exception("test"));
+
+        waitForTopology(2);
+
+        startGrid(10);
+
+        waitForTopology(3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConnectionCheck() throws Exception {
        final int NODES = 5;
 
@@ -1928,9 +2024,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
            TcpCommunicationSpi spi = 
(TcpCommunicationSpi)node.configuration().getCommunicationSpi();
 
-           List<ClusterNode> nodes = new ArrayList<>();
-
-           nodes.addAll(node.cluster().nodes());
+           List<ClusterNode> nodes = new ArrayList<>(node.cluster().nodes());
 
            BitSet res = spi.checkConnection(nodes).get();
 
@@ -2622,10 +2716,40 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      *
      */
+    static class TestNodeKillCommunicationProblemResolver implements 
CommunicationProblemResolver {
+        /** */
+        final Collection<Long> killNodeOrders;
+
+        /**
+         * @param killOrders Killed nodes order.
+         */
+        TestNodeKillCommunicationProblemResolver(Collection<Long> killOrders) {
+            this.killNodeOrders = killOrders;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resolve(CommunicationProblemContext ctx) {
+            List<ClusterNode> nodes = ctx.topologySnapshot();
+
+            assertTrue(nodes.size() > 0);
+
+            for (ClusterNode node : nodes) {
+                if (killNodeOrders.contains(node.order()))
+                    ctx.killNode(node);
+            }
+        }
+    }
+
+    /**
+     *
+     */
     static class ZkTestCommunicationSpi extends TcpCommunicationSpi {
         /** */
         private volatile CountDownLatch pingLatch;
 
+        /** */
+        private volatile BitSet checkRes;
+
         /**
          * @param ignite Node.
          * @return Node's communication SPI.
@@ -2646,6 +2770,14 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                 throw new IgniteException(e);
             }
 
+            BitSet checkRes = this.checkRes;
+
+            if (checkRes != null) {
+                this.checkRes = null;
+
+                return new IgniteFinishedFutureImpl<>(checkRes);
+            }
+
             return super.checkConnection(nodes);
         }
     }

Reply via email to