zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e4f2c20 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e4f2c20 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e4f2c20 Branch: refs/heads/ignite-zk Commit: 9e4f2c20c2582655637c5abf349abc9e5473e9fe Parents: fc085a4 Author: sboikov <sboi...@gridgain.com> Authored: Mon Dec 18 15:25:34 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Dec 18 18:08:47 2017 +0300 ---------------------------------------------------------------------- .../internal/ZkCommunicationErrorNodeState.java | 8 +- .../ZkCommunicationErrorProcessFuture.java | 28 ++ ...kCommunicationErrorResolveFinishMessage.java | 7 +- .../ZkCommunicationErrorResolveResult.java | 15 +- .../internal/ZkCommunicationProblemContext.java | 67 +++- .../zk/internal/ZkDiscoveryEventsData.java | 2 +- .../ZkDistributedCollectDataFuture.java | 11 +- .../discovery/zk/internal/ZookeeperClient.java | 2 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 359 +++++++++++++++---- .../ZookeeperDiscoverySpiBasicTest.java | 138 ++++++- 10 files changed, 540 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java index ddc310d..9c21f13 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java @@ -23,21 +23,23 @@ import java.util.BitSet; /** * */ -public class ZkCommunicationErrorNodeState implements Serializable { +class ZkCommunicationErrorNodeState implements Serializable { /** */ private static final long serialVersionUID = 0L; /** */ - private final BitSet commState; + final BitSet commState; /** */ - private final Exception err; + final Exception err; /** * @param commState Communication state. * @param err Error if failed get communication state.. */ ZkCommunicationErrorNodeState(BitSet commState, Exception err) { + assert commState != null || err != null; + this.commState = commState; this.err = err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index a6294bd..0074817 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -214,6 +214,34 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen } /** + * @param err Error. + */ + void onError(Exception err) { + assert err != null; + + Map<Long, GridFutureAdapter<Boolean>> futs; + + synchronized (this) { + if (state == State.DONE) { + assert resErr != null; + + return; + } + + state = State.DONE; + + resErr = err; + + futs = nodeFuts; // nodeFuts should not be modified after state changed to DONE. + } + + for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) + e.getValue().onDone(err); + + onDone(err); + } + + /** * @param failedNodes Node failed as result of resolve process. */ void onFinishResolve(Set<Long> failedNodes) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java index 20aeddf..147b78f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java @@ -33,13 +33,18 @@ class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMess final UUID futId; /** */ + final long topVer; + + /** */ transient ZkCommunicationErrorResolveResult res; /** * @param futId Future ID. + * @param topVer Topology version when resolve process finished. */ - ZkCommunicationErrorResolveFinishMessage(UUID futId) { + ZkCommunicationErrorResolveFinishMessage(UUID futId, long topVer) { this.futId = futId; + this.topVer = topVer; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java index 607f93b..68cbdb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java @@ -18,7 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; -import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridIntList; import org.jetbrains.annotations.Nullable; /** @@ -29,12 +29,17 @@ class ZkCommunicationErrorResolveResult implements Serializable { private static final long serialVersionUID = 0L; /** */ - final GridLongList failedNodes; + final GridIntList killedNodes; + + /** */ + final Exception err; /** - * @param failedNodes + * @param killedNodes Killed nodes. + * @param err Error. */ - ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) { - this.failedNodes = failedNodes; + ZkCommunicationErrorResolveResult(@Nullable GridIntList killedNodes, Exception err) { + this.killedNodes = killedNodes; + this.err = err; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java index fd11b55..9cb48cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationProblemContext.java @@ -17,9 +17,14 @@ package org.apache.ignite.spi.discovery.zk.internal; +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CommunicationProblemContext; @@ -28,31 +33,73 @@ import org.apache.ignite.configuration.CommunicationProblemContext; */ class ZkCommunicationProblemContext implements CommunicationProblemContext { /** */ + private static final Comparator<ClusterNode> NODE_ORDER_CMP = new Comparator<ClusterNode>() { + @Override public int compare(ClusterNode node1, ClusterNode node2) { + return Long.compare(node1.order(), node2.order()); + } + }; + + /** */ private Set<ClusterNode> killedNodes = new HashSet<>(); + /** */ + private final Map<UUID, BitSet> nodesState; + + /** */ + private final List<ClusterNode> initialNodes; + + /** */ + private final List<ClusterNode> curNodes; + + /** + * @param curNodes Current topology snapshot. + * @param initialNodes Topology snapshot when communication error resolve started. + * @param nodesState Nodes communication state. + */ + ZkCommunicationProblemContext(List<ClusterNode> curNodes, + List<ClusterNode> initialNodes, + Map<UUID, BitSet> nodesState) + { + this.curNodes = Collections.unmodifiableList(curNodes); + this.initialNodes = initialNodes; + this.nodesState = nodesState; + } + /** {@inheritDoc} */ @Override public List<ClusterNode> topologySnapshot() { - return null; + return curNodes; } /** {@inheritDoc} */ @Override public boolean connectionAvailable(ClusterNode node1, ClusterNode node2) { - return false; + BitSet nodeState = nodesState.get(node1.id()); + + if (nodeState == null) + throw new IllegalArgumentException("Invalid node: " + node1); + + int nodeIdx = Collections.binarySearch(initialNodes, node2, NODE_ORDER_CMP); + + if (nodeIdx < 0) + throw new IllegalArgumentException("Invalid node: " + node2); + + assert nodeIdx < nodeState.size() : nodeIdx; + + return nodeState.get(nodeIdx); } /** {@inheritDoc} */ @Override public List<String> startedCaches() { - return null; + return null; // TODO ZK } /** {@inheritDoc} */ @Override public List<List<ClusterNode>> cacheAffinity(String cacheName) { - return null; + return null; // TODO ZK } /** {@inheritDoc} */ @Override public List<List<ClusterNode>> cachePartitionOwners(String cacheName) { - return null; + return null; // TODO ZK } /** {@inheritDoc} */ @@ -60,6 +107,16 @@ class ZkCommunicationProblemContext implements CommunicationProblemContext { if (node == null) throw new NullPointerException(); + if (Collections.binarySearch(curNodes, node, NODE_ORDER_CMP) < 0) + throw new IllegalArgumentException("Invalid node: " + node); + killedNodes.add(node); } + + /** + * @return Nodes to fail. + */ + Set<ClusterNode> killedNodes() { + return killedNodes; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index faea49e..6bdf573 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -69,7 +69,7 @@ class ZkDiscoveryEventsData implements Serializable { /** * @param id Future ID. */ - void communicationErrorResolveFutureId(UUID id) { + void communicationErrorResolveFutureId(@Nullable UUID id) { commErrFutId = id; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java index 19e2acc..2467928 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java @@ -141,9 +141,14 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> { // TODO ZK: use multi, better batching + max-size safe + NoNodeException safe. String evtDir = paths.distributedFutureBasePath(futId); - client.deleteAll(evtDir, - client.getChildren(evtDir), - -1); + try { + client.deleteAll(evtDir, + client.getChildren(evtDir), + -1); + } + catch (KeeperException.NoNodeException e) { + // TODO ZK + } client.deleteIfExists(evtDir, -1); http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 5923b39..5c0bd58 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -940,7 +940,7 @@ public class ZookeeperClient implements Watcher { else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) U.warn(log, "Failed to execute async operation, connection lost [path=" + path + ']'); else - assert rc == 0 : rc; + assert rc == 0 : KeeperException.Code.get(rc); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 62fc581..ef3504f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -50,6 +51,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -246,6 +248,8 @@ public class ZookeeperDiscoveryImpl { * @param err Connect error. */ public void onCommunicationConnectionError(ClusterNode node0, Exception err) { + checkState(); + ZookeeperClusterNode node = node(node0.id()); if (node == null) @@ -269,7 +273,16 @@ public class ZookeeperDiscoveryImpl { ", err= " + err + ']'); } - fut.scheduleCheckOnTimeout(); + ConnectionState connState; + + synchronized (this) { + connState = this.connState; + } + + if (connState != ConnectionState.STARTED) + fut.onError(new IgniteCheckedException("Node stopped.")); + else + fut.scheduleCheckOnTimeout(); } else fut = commErrProcFut.get(); @@ -475,7 +488,7 @@ public class ZookeeperDiscoveryImpl { break; case STOPPED: - throw new IgniteSpiException("Zookeeper client closed."); + throw new IgniteSpiException("Node stopped."); case DISCONNECTED: throw new IgniteClientDisconnectedException(null, "Client is disconnected."); @@ -1025,11 +1038,41 @@ public class ZookeeperDiscoveryImpl { } /** + * @param lastEvts Last events from previous coordinator. + * @throws Exception If failed. + */ + private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) throws Exception { + for (ZkDiscoveryEventData evtData : lastEvts.evts.values()) { + if (evtData instanceof ZkDiscoveryCustomEventData) { + ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; + + // It is possible previous coordinator failed before finished message processing. + if (evtData0.msg instanceof ZkCommunicationErrorResolveFinishMessage) { + try { + ZkCommunicationErrorResolveFinishMessage msg = + (ZkCommunicationErrorResolveFinishMessage)evtData0.msg; + + ZkCommunicationErrorResolveResult res = unmarshalZip( + ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, msg.futId)); + + deleteAliveNodes(res.killedNodes); + } + catch (KeeperException.NoNodeException ignore) { + // No-op. + } + } + } + } + } + + /** * @param aliveNodes Alive nodes paths. * @param locInternalId Local node's internal ID. * @throws Exception If failed. */ private void onBecomeCoordinator(List<String> aliveNodes, int locInternalId) throws Exception { + long topVer0 = rtState.evtsData != null ? rtState.evtsData.topVer : -1L; + byte[] evtsDataBytes = rtState.zkClient.getData(zkPaths.evtsPath); if (evtsDataBytes.length > 0) @@ -1044,6 +1087,11 @@ public class ZookeeperDiscoveryImpl { assert locNode.order() > 0 : locNode; assert rtState.evtsData != null; + previousCoordinatorCleanup(rtState.evtsData); + + if (topVer0 > rtState.evtsData.topVer) + rtState.evtsData.topVer = topVer0; + UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); if (futId != null) { @@ -1636,6 +1684,7 @@ public class ZookeeperDiscoveryImpl { assert node != null : msg0.nodeId; + // TODO ZK: delete when process event for (String child : zkClient.getChildren(zkPaths.aliveNodesDir)) { if (ZkIgnitePaths.aliveInternalId(child) == node.internalId()) { zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child); @@ -2021,22 +2070,25 @@ public class ZookeeperDiscoveryImpl { ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId); if (msg.warning != null) { - U.warn(log, "Received EVT_NODE_FAILED event with warning [" + - "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + - ", nodeId=" + msg.nodeId + - ", msg=" + msg.warning + ']'); + U.warn(log, "Received force EVT_NODE_FAILED event with warning [" + + "nodeId=" + msg.nodeId + + ", msg=" + msg.warning + + ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + ']'); } else { U.warn(log, "Received force EVT_NODE_FAILED event [" + - "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + - ", nodeId=" + msg.nodeId + ']'); + "nodeId=" + msg.nodeId + + ", nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : evtData.sndNodeId) + ']'); } ZookeeperClusterNode node = rtState.top.nodesById.get(msg.nodeId); assert node != null : msg.nodeId; - processNodeFail(node.internalId(), evtData.topologyVersion()); + if (node.isLocal()) + throw localNodeFail("Received force EVT_NODE_FAILED event for local node."); + else + notifyNodeFail(node.internalId(), evtData.topologyVersion()); } /** @@ -2049,13 +2101,10 @@ public class ZookeeperDiscoveryImpl { UUID futId = msg.futId; assert futId != null; - assert futId.equals(rtState.evtsData.communicationErrorResolveFutureId()); if (log.isInfoEnabled()) log.info("Received communication error resolve finish message [reqId=" + futId + ']'); - rtState.evtsData.communicationErrorResolveFutureId(null); - rtState.commErrProcNodes = null; ZkCommunicationErrorResolveResult res = msg.res; @@ -2069,14 +2118,72 @@ public class ZookeeperDiscoveryImpl { Set<Long> failedNodes = null; - if (res.failedNodes != null) { - failedNodes = U.newHashSet(res.failedNodes.size()); + if (res.err != null) + U.error(log, "Communication error resolve failed: " + res.err, res.err); + else { + if (res.killedNodes != null) { + failedNodes = U.newHashSet(res.killedNodes.size()); + + for (int i = 0; i < res.killedNodes.size(); i++) { + int internalId = res.killedNodes.get(i); + + if (internalId == locNode.internalId()) { + fut.onError(new IgniteCheckedException("Local node is forced to stop " + + "by communication error resolver")); + + if (rtState.crd) + deleteAliveNodes(res.killedNodes); + + throw localNodeFail("Local node is forced to stop by communication error resolver " + + "[nodeId=" + locNode.id() + ']'); + } + + ZookeeperClusterNode node = rtState.top.nodesByInternalId.get(internalId); + + assert node != null : internalId; + + failedNodes.add(node.order()); + } + + long topVer = msg.topVer; + + for (int i = 0; i < res.killedNodes.size(); i++) { + int nodeInternalId = res.killedNodes.get(i); - for (int i = 0; i < res.failedNodes.size(); i++) - failedNodes.add(res.failedNodes.get(i)); + ClusterNode node = rtState.top.nodesByInternalId.get(nodeInternalId); + + assert node != null : nodeInternalId; + + if (log.isInfoEnabled()) + log.info("Node stop is forced by communication error resolver [nodeId=" + node.id() + ']'); + + notifyNodeFail(nodeInternalId, ++topVer); + } + } } fut.onFinishResolve(failedNodes); + + if (rtState.crd) + deleteAliveNodes(res.killedNodes); + } + + /** + * @param internalIds Nodes internal IDs. + * @throws Exception If failed. + */ + private void deleteAliveNodes(@Nullable GridIntList internalIds) throws Exception { + if (internalIds == null) + return; + + List<String> alives = rtState.zkClient.getChildren(zkPaths.aliveNodesDir); + + for (int i = 0; i < alives.size(); i++) { + String alive = alives.get(i); + + if (internalIds.contains(ZkIgnitePaths.aliveInternalId(alive))) + rtState.zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + alive); + } } /** @@ -2163,7 +2270,7 @@ public class ZookeeperDiscoveryImpl { private void onCommunicationErrorResolveStatusReceived(ZkRuntimeState rtState) throws Exception { ZkDiscoveryEventsData evtsData = rtState.evtsData; - UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); + UUID futId = evtsData.communicationErrorResolveFutureId(); if (log.isInfoEnabled()) log.info("Received communication status from all nodes [reqId=" + futId + ']'); @@ -2178,43 +2285,113 @@ public class ZookeeperDiscoveryImpl { rtState.commErrProcNodes = null; - ZkClusterNodes top = rtState.top; + List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); - List<ZkCommunicationErrorNodeState> nodesRes = new ArrayList<>(); + Map<UUID, BitSet> nodesRes = U.newHashMap(topSnapshot.size()); - for (ZookeeperClusterNode node : top.nodesByOrder.values()) { + Exception err = null; + + for (ClusterNode node : topSnapshot) { byte[] stateBytes = ZkDistributedCollectDataFuture.readNodeResult(futPath, rtState.zkClient, node.order()); ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes); - nodesRes.add(nodeState); + if (nodeState.err != null) { + if (err == null) + err = new Exception("Failed to resolve communication error."); + + err.addSuppressed(nodeState.err); + } + else { + assert nodeState.commState != null; + + nodesRes.put(node.id(), nodeState.commState); + } } - ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId); + long topVer = evtsData.topVer; - ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(null); + GridIntList killedNodesList = null; - msg.res = res; + if (err == null) { + boolean fullyConnected = true; - ZkDistributedCollectDataFuture.saveResult(zkPaths.distributedFutureResultPath(futId), - rtState.zkClient, - marshalZip(res)); + for (Map.Entry<UUID, BitSet> e : nodesRes.entrySet()) { + if (!checkFullyConnected(e.getValue(), initialNodes, rtState.top)) { + fullyConnected = false; + + break; + } + } - CommunicationProblemResolver rslvr = spi.ignite().configuration().getCommunicationProblemResolver(); + if (fullyConnected) { + if (log.isInfoEnabled()) { + log.info("Finish communication error resolve process automatically, there are no " + + "communication errors [reqId=" + futId + ']'); + } + } + else { + CommunicationProblemResolver rslvr = spi.ignite().configuration().getCommunicationProblemResolver(); + + if (rslvr != null) { + if (log.isInfoEnabled()) { + log.info("Call communication error resolver [reqId=" + futId + + ", rslvr=" + rslvr.getClass().getSimpleName() + ']'); + } - if (rslvr != null) { - ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(); + ZkCommunicationProblemContext ctx = new ZkCommunicationProblemContext(topSnapshot, + initialNodes, + nodesRes); - rslvr.resolve(ctx); + try { + rslvr.resolve(ctx); + + Set<ClusterNode> killedNodes = ctx.killedNodes(); + + if (killedNodes != null) { + if (log.isInfoEnabled()) { + log.info("Communication error resolver forces nodes stop [reqId=" + futId + + ", killNodeCnt=" + killedNodes.size() + + ", nodeIds=" + U.nodeIds(killedNodes) + ']'); + } + + killedNodesList = new GridIntList(killedNodes.size()); + + for (ClusterNode killedNode : killedNodes) { + killedNodesList.add(((ZookeeperClusterNode)killedNode).internalId()); + + evtsData.topVer++; + } + } + } + catch (Exception e) { + err = e; + + U.error(log, "Failed to resolve communication error with configured resolver [reqId=" + futId + ']', e); + } + } + } } + evtsData.communicationErrorResolveFutureId(null); + + ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(killedNodesList, err); + + ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId, topVer); + + msg.res = res; + + ZkDistributedCollectDataFuture.saveResult(zkPaths.distributedFutureResultPath(futId), + rtState.zkClient, + marshalZip(res)); + evtsData.evtIdGen++; ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( evtsData.evtIdGen, - evtsData.topVer, + topVer, locNode.id(), msg, null, @@ -2228,6 +2405,30 @@ public class ZookeeperDiscoveryImpl { } /** + * @param commState Node communication state. + * @param initialNodes Topology snapshot when communication error resolve started. + * @param top Current topology. + * @return {@code True} if node has connection to all alive nodes. + */ + private boolean checkFullyConnected(BitSet commState, List<ClusterNode> initialNodes, ZkClusterNodes top) { + int startIdx = 0; + + for (;;) { + int idx = commState.nextClearBit(startIdx); + + if (idx >= initialNodes.size()) + return true; + + ClusterNode node = initialNodes.get(idx); + + if (top.nodesById.containsKey(node.id())) + return false; + + startIdx = idx + 1; + } + } + + /** * */ public void simulateNodeFailure() { @@ -2292,70 +2493,73 @@ public class ZookeeperDiscoveryImpl { /** * @param evtData Event data. - * @throws Exception If failed. */ - private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) throws Exception { - processNodeFail(evtData.failedNodeInternalId(), evtData.topologyVersion()); + private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) { + notifyNodeFail(evtData.failedNodeInternalId(), evtData.topologyVersion()); } /** - * @param nodeInternalId Failed node internal ID. + * @param nodeInternalOrder Node order. * @param topVer Topology version. - * @throws Exception If failed. */ - private void processNodeFail(int nodeInternalId, long topVer) throws Exception { - final ZookeeperClusterNode failedNode = rtState.top.removeNode(nodeInternalId); + private void notifyNodeFail(int nodeInternalOrder, long topVer) { + final ZookeeperClusterNode failedNode = rtState.top.removeNode(nodeInternalOrder); - assert failedNode != null; + assert failedNode != null && !failedNode.isLocal() : failedNode; - if (failedNode.isLocal()) { - U.warn(log, "Received EVT_NODE_FAILED for local node."); + PingFuture pingFut = pingFuts.get(failedNode.order()); - rtState.onCloseStart(); + if (pingFut != null) + pingFut.onDone(false); - if (locNode.isClient() && clientReconnectEnabled) { - boolean reconnect = false; + final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); - synchronized (stateMux) { - if (connState == ConnectionState.STARTED) { - reconnect = true; + lsnr.onDiscovery(EVT_NODE_FAILED, + topVer, + failedNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + null); + } - connState = ConnectionState.DISCONNECTED; - } - } + /** + * @param msg Message to log. + * @return Exception to be thrown. + */ + private Exception localNodeFail(String msg) { + U.warn(log, msg); + + rtState.onCloseStart(); - if (reconnect) { - UUID newId = UUID.randomUUID(); + if (clientReconnectEnabled) { + assert locNode.isClient() : locNode; - U.quietAndWarn(log, "Received EVT_NODE_FAILED for local node, will try to reconnect with new id [" + - "newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + ']'); + boolean reconnect = false; - runInWorkerThread(new ReconnectClosure(newId)); + synchronized (stateMux) { + if (connState == ConnectionState.STARTED) { + reconnect = true; + + connState = ConnectionState.DISCONNECTED; } } - else - notifySegmented(); - // Stop any further processing. - throw new ZookeeperClientFailedException("Received node failed event for local node."); - } - else { - PingFuture pingFut = pingFuts.get(failedNode.order()); - - if (pingFut != null) - pingFut.onDone(false); + if (reconnect) { + UUID newId = UUID.randomUUID(); - final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); + U.quietAndWarn(log, "Client node will try to reconnect with new id [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + ']'); - lsnr.onDiscovery(EVT_NODE_FAILED, - topVer, - failedNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + runInWorkerThread(new ReconnectClosure(newId)); + } } + else + notifySegmented(); + + // Stop any further processing. + return new ZookeeperClientFailedException(msg); } /** @@ -2629,6 +2833,11 @@ public class ZookeeperDiscoveryImpl { if (zkClient != null) zkClient.close(); + ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get(); + + if (commErrFut != null) + commErrFut.onError(new IgniteCheckedException("Node stopped.")); + IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, log); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9e4f2c20/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index cee2e76..a29f478 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -20,7 +20,9 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.File; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -62,6 +64,7 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -1712,6 +1715,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * TODO ZK + * * @throws Exception If failed. */ public void _testCommunicationFailure() throws Exception { @@ -1918,6 +1923,97 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCommunicationErrorResolve_KillNode_1() throws Exception { + communicationErrorResolve_KillNodes(2, Collections.singletonList(2L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillNode_2() throws Exception { + communicationErrorResolve_KillNodes(3, Collections.singletonList(2L)); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillNode_3() throws Exception { + communicationErrorResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L)); + } + + /** + * @param startNodes Number of nodes to start. + * @param killNodes Nodes to kill by resolve process. + * @throws Exception If failed. + */ + private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception { + testCommSpi = true; + + commProblemRslvr = new TestNodeKillCommunicationProblemResolver(killNodes); + + startGrids(startNodes); + + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(0)); + + commSpi.checkRes = new BitSet(startNodes); + + ZookeeperDiscoverySpi spi = null; + + for (Ignite node : G.allGrids()) { + ZookeeperDiscoverySpi spi0 = spi(node); + + if (!killNodes.contains(node.cluster().localNode().order())) { + spi = spi0; + + break; + } + } + + assertNotNull(spi); + + spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new Exception("test")); + + int expNodes = startNodes - killNodes.size(); + + waitForTopology(expNodes); + + for (Ignite node : G.allGrids()) + assertFalse(killNodes.contains(node.cluster().localNode().order())); + + startGrid(startNodes); + + waitForTopology(expNodes + 1); + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillCoordinator() throws Exception { + // Kill coordinator. + testCommSpi = true; + + commProblemRslvr = new TestNodeKillCommunicationProblemResolver(Collections.singleton(1L)); + + startGrids(3); + + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(2)); + + commSpi.checkRes = new BitSet(3); + + ZookeeperDiscoverySpi spi = spi(ignite(1)); + + spi.onCommunicationConnectionError(ignite(0).cluster().localNode(), new Exception("test")); + + waitForTopology(2); + + startGrid(10); + + waitForTopology(3); + } + + /** + * @throws Exception If failed. + */ public void testConnectionCheck() throws Exception { final int NODES = 5; @@ -1928,9 +2024,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi(); - List<ClusterNode> nodes = new ArrayList<>(); - - nodes.addAll(node.cluster().nodes()); + List<ClusterNode> nodes = new ArrayList<>(node.cluster().nodes()); BitSet res = spi.checkConnection(nodes).get(); @@ -2622,10 +2716,40 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * */ + static class TestNodeKillCommunicationProblemResolver implements CommunicationProblemResolver { + /** */ + final Collection<Long> killNodeOrders; + + /** + * @param killOrders Killed nodes order. + */ + TestNodeKillCommunicationProblemResolver(Collection<Long> killOrders) { + this.killNodeOrders = killOrders; + } + + /** {@inheritDoc} */ + @Override public void resolve(CommunicationProblemContext ctx) { + List<ClusterNode> nodes = ctx.topologySnapshot(); + + assertTrue(nodes.size() > 0); + + for (ClusterNode node : nodes) { + if (killNodeOrders.contains(node.order())) + ctx.killNode(node); + } + } + } + + /** + * + */ static class ZkTestCommunicationSpi extends TcpCommunicationSpi { /** */ private volatile CountDownLatch pingLatch; + /** */ + private volatile BitSet checkRes; + /** * @param ignite Node. * @return Node's communication SPI. @@ -2646,6 +2770,14 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { throw new IgniteException(e); } + BitSet checkRes = this.checkRes; + + if (checkRes != null) { + this.checkRes = null; + + return new IgniteFinishedFutureImpl<>(checkRes); + } + return super.checkConnection(nodes); } }