This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6d97857 IGNITE-13577 Graceful node shutdown for Zookeeper Discovery SPI - Fixes #8371. 6d97857 is described below commit 6d9785706e4a7ca0edeccc32dc6fdf34f9143956 Author: Ivan Daschinskiy <ivanda...@gmail.com> AuthorDate: Mon Nov 2 17:55:13 2020 +0300 IGNITE-13577 Graceful node shutdown for Zookeeper Discovery SPI - Fixes #8371. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 2 +- ...stractCallabck.java => ZkAbstractCallback.java} | 6 +- .../zk/internal/ZkAbstractChildrenCallback.java | 2 +- .../discovery/zk/internal/ZkAbstractWatcher.java | 2 +- .../zk/internal/ZkDiscoveryEventData.java | 4 +- ...ata.java => ZkDiscoveryNodeLeaveEventData.java} | 45 +++- .../spi/discovery/zk/internal/ZkIgnitePaths.java | 57 ++++- .../spi/discovery/zk/internal/ZkRunnable.java | 2 +- .../spi/discovery/zk/internal/ZookeeperClient.java | 26 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 124 +++++++--- .../zk/internal/ZookeeperDiscoveryStatistics.java | 32 ++- .../zk/ZookeeperDiscoverySpiTestSuite1.java | 2 + ...erDiscoveryConcurrentStartAndStartStopTest.java | 6 +- .../zk/internal/ZookeeperDiscoveryMiscTest.java | 2 + ...perDiscoveryRandomStopOrFailConcurrentTest.java | 264 +++++++++++++++++++++ ...coverySegmentationAndConnectionRestoreTest.java | 4 +- .../zk/internal/ZookeeperDiscoverySpiTestBase.java | 90 +++++++ .../internal/ZookeeperDiscoverySpiTestHelper.java | 6 +- ...perDiscoveryTopologyChangeAndReconnectTest.java | 90 ------- .../zookeeper/ZkTestClientCnxnSocketNIO.java | 3 +- 20 files changed, 595 insertions(+), 174 deletions(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 5cdfa58..3de8df6 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -595,7 +595,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements IgniteDis /** {@inheritDoc} */ @Override public long getNodesLeft() { - return 0; + return stats.leftNodesCnt(); } /** {@inheritDoc} */ diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallback.java similarity index 92% rename from modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java rename to modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallback.java index b80a9dd..427a81c 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallback.java @@ -22,12 +22,12 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; /** * */ -abstract class ZkAbstractCallabck { +abstract class ZkAbstractCallback { /** */ final ZkRuntimeState rtState; /** */ - private final ZookeeperDiscoveryImpl impl; + final ZookeeperDiscoveryImpl impl; /** */ private final GridSpinBusyLock busyLock; @@ -36,7 +36,7 @@ abstract class ZkAbstractCallabck { * @param rtState Runtime state. * @param impl Discovery impl. */ - ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { + ZkAbstractCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) { this.rtState = rtState; this.impl = impl; diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java index 2292e35..dc680f3 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java @@ -24,7 +24,7 @@ import org.apache.zookeeper.data.Stat; /** * */ -abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck implements AsyncCallback.Children2Callback { +abstract class ZkAbstractChildrenCallback extends ZkAbstractCallback implements AsyncCallback.Children2Callback { /** * @param rtState Runtime state. * @param impl Discovery impl. diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java index 9098d05..37e65e5 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java @@ -23,7 +23,7 @@ import org.apache.zookeeper.Watcher; /** * */ -abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher { +abstract class ZkAbstractWatcher extends ZkAbstractCallback implements Watcher { /** * @param rtState Runtime state. * @param impl Discovery impl. diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java index d667a17..2bc49e5 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java @@ -30,7 +30,7 @@ abstract class ZkDiscoveryEventData implements Serializable { static final byte ZK_EVT_NODE_JOIN = 1; /** */ - static final byte ZK_EVT_NODE_FAILED = 2; + static final byte ZK_EVT_NODE_LEFT = 2; /** */ static final byte ZK_EVT_CUSTOM_EVT = 3; @@ -59,7 +59,7 @@ abstract class ZkDiscoveryEventData implements Serializable { * @param topVer Topology version. */ ZkDiscoveryEventData(long evtId, byte evtType, long topVer) { - assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_FAILED || evtType == ZK_EVT_CUSTOM_EVT : evtType; + assert evtType == ZK_EVT_NODE_JOIN || evtType == ZK_EVT_NODE_LEFT || evtType == ZK_EVT_CUSTOM_EVT : evtType; this.evtId = evtId; this.evtType = evtType; diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeLeaveEventData.java similarity index 53% rename from modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java rename to modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeLeaveEventData.java index c76158f..77d1157 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeLeaveEventData.java @@ -20,36 +20,59 @@ package org.apache.ignite.spi.discovery.zk.internal; /** * */ -class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData { +class ZkDiscoveryNodeLeaveEventData extends ZkDiscoveryEventData { /** */ private static final long serialVersionUID = 0L; /** */ - private long failedNodeInternalId; + private final long leftNodeInternalId; + + /** */ + private final boolean failed; /** * @param evtId Event ID. * @param topVer Topology version. - * @param failedNodeInternalId Failed node ID. + * @param leftNodeInternalId Failed node ID. */ - ZkDiscoveryNodeFailEventData(long evtId, long topVer, long failedNodeInternalId) { - super(evtId, ZK_EVT_NODE_FAILED, topVer); + ZkDiscoveryNodeLeaveEventData(long evtId, long topVer, long leftNodeInternalId) { + this(evtId, topVer, leftNodeInternalId, false); + } - this.failedNodeInternalId = failedNodeInternalId; + /** + * @param evtId Event ID. + * @param topVer Topology version. + * @param leftNodeInternalId Left node ID. + */ + ZkDiscoveryNodeLeaveEventData(long evtId, long topVer, long leftNodeInternalId, boolean failed) { + super(evtId, ZK_EVT_NODE_LEFT, topVer); + + this.leftNodeInternalId = leftNodeInternalId; + + this.failed = failed; + } + + /** + * @return Left node ID. + */ + long leftNodeInternalId() { + return leftNodeInternalId; } /** - * @return Failed node ID. + * + * @return {@code true} if failed. */ - long failedNodeInternalId() { - return failedNodeInternalId; + boolean failed() { + return failed; } /** {@inheritDoc} */ @Override public String toString() { - return "ZkDiscoveryNodeFailEventData [" + + return "ZkDiscoveryNodeLeaveEventData [" + "evtId=" + eventId() + ", topVer=" + topologyVersion() + - ", nodeId=" + failedNodeInternalId + ']'; + ", nodeId=" + leftNodeInternalId + + ", failed=" + failed + ']'; } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 4e54254..02e9d36 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -44,6 +44,9 @@ public class ZkIgnitePaths { /** Directory to store acknowledge messages for custom events. */ private static final String CUSTOM_EVTS_ACKS_DIR = "ca"; + /** Directory to store node's stopped flags. */ + private static final String STOPPED_NODES_FLAGS_DIR = "sf"; + /** Directory to store EPHEMERAL znodes for alive cluster nodes. */ static final String ALIVE_NODES_DIR = "n"; @@ -71,6 +74,9 @@ public class ZkIgnitePaths { /** */ final String customEvtsAcksDir; + /** */ + final String stoppedNodesFlagsDir; + /** * @param zkRootPath Base Zookeeper directory for all Ignite nodes. */ @@ -83,6 +89,7 @@ public class ZkIgnitePaths { customEvtsDir = zkPath(CUSTOM_EVTS_DIR); customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR); customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR); + stoppedNodesFlagsDir = zkPath(STOPPED_NODES_FLAGS_DIR); } /** @@ -90,7 +97,7 @@ public class ZkIgnitePaths { * @return Full path. */ private String zkPath(String path) { - return clusterDir + "/" + path; + return join(clusterDir, path); } /** @@ -99,7 +106,7 @@ public class ZkIgnitePaths { * @return Path. */ String joiningNodeDataPath(UUID nodeId, UUID prefixId) { - return joinDataDir + '/' + prefixId + ":" + nodeId.toString(); + return join(joinDataDir, prefixId + ":" + nodeId.toString()); } /** @@ -109,7 +116,7 @@ public class ZkIgnitePaths { static long aliveInternalId(String path) { int idx = path.lastIndexOf('|'); - return Integer.parseInt(path.substring(idx + 1)); + return Long.parseLong(path.substring(idx + 1)); } /** @@ -123,7 +130,7 @@ public class ZkIgnitePaths { if (node.isClient()) flags |= CLIENT_NODE_FLAG_MASK; - return aliveNodesDir + "/" + prefix + ":" + node.id() + ":" + encodeFlags(flags) + "|"; + return join(aliveNodesDir, prefix + ":" + node.id() + ":" + encodeFlags(flags) + "|"); } /** @@ -156,6 +163,26 @@ public class ZkIgnitePaths { } /** + * @param node Leaving node. + * @return Stopped node path. + */ + String nodeStoppedFlag(ZookeeperClusterNode node) { + String path = node.id().toString() + '|' + node.internalId(); + + return join(stoppedNodesFlagsDir, path); + } + + /** + * @param path Leaving flag path. + * @return Stopped node internal id. + */ + static long stoppedFlagNodeInternalId(String path) { + int idx = path.lastIndexOf('|'); + + return Long.parseLong(path.substring(idx + 1)); + } + + /** * @param path Event zk path. * @return Event sequence number. */ @@ -212,7 +239,7 @@ public class ZkIgnitePaths { * @return Path. */ String createCustomEventPath(String prefix, UUID nodeId, int partCnt) { - return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|'; + return join(customEvtsDir, prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|'); } /** @@ -221,7 +248,7 @@ public class ZkIgnitePaths { * @return Path. */ String customEventPartsBasePath(String prefix, UUID nodeId) { - return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":"; + return join(customEvtsPartsDir, prefix + ":" + nodeId + ":"); } /** @@ -239,7 +266,7 @@ public class ZkIgnitePaths { * @return Event zk path. */ String joinEventDataPathForJoined(long evtId) { - return evtsPath + "/fj-" + evtId; + return join(evtsPath,"fj-" + evtId); } /** @@ -247,7 +274,7 @@ public class ZkIgnitePaths { * @return Event zk path. */ String joinEventSecuritySubjectPath(long topVer) { - return evtsPath + "/s-" + topVer; + return join(evtsPath, "s-" + topVer); } /** @@ -257,7 +284,7 @@ public class ZkIgnitePaths { String ackEventDataPath(long origEvtId) { assert origEvtId != 0; - return customEvtsAcksDir + "/" + String.valueOf(origEvtId); + return join(customEvtsAcksDir, String.valueOf(origEvtId)); } /** @@ -265,7 +292,7 @@ public class ZkIgnitePaths { * @return Future path. */ String distributedFutureBasePath(UUID id) { - return evtsPath + "/f-" + id; + return join(evtsPath, "f-" + id); } /** @@ -273,7 +300,7 @@ public class ZkIgnitePaths { * @return Future path. */ String distributedFutureResultPath(UUID id) { - return evtsPath + "/fr-" + id; + return join(evtsPath, "fr-" + id); } /** @@ -306,6 +333,14 @@ public class ZkIgnitePaths { } /** + * @param paths Paths to join. + * @return Paths joined with separator. + */ + public static String join(String... paths) { + return String.join(PATH_SEPARATOR, paths); + } + + /** * Validate the provided znode path string. * * @param path znode path string. diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java index 965bdc0..1be63e0 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java @@ -20,7 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal; /** * Zk Runnable. */ -public abstract class ZkRunnable extends ZkAbstractCallabck implements Runnable { +public abstract class ZkRunnable extends ZkAbstractCallback implements Runnable { /** * @param rtState Runtime state. * @param impl Discovery impl. diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 7e1bb9a..e98bc01 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -442,6 +442,32 @@ public class ZookeeperClient implements Watcher { } /** + * @param path Path. + * @param data Data. + * @param createMode Create mode. + * @return Created path. + * @throws KeeperException In case of zookeeper error. + * @throws InterruptedException If interrupted. + */ + String createIfNeededNoRetry(String path, byte[] data, CreateMode createMode) + throws KeeperException, InterruptedException { + assert !createMode.isSequential() : createMode; + + if (data == null) + data = EMPTY_BYTES; + + try { + return zk.create(path, data, ZK_ACL, createMode); + } + catch (KeeperException.NodeExistsException e) { + if (log.isDebugEnabled()) + log.debug("Node already exists: " + path); + + return path; + } + } + + /** * @param checkPrefix Unique prefix to check in case of retry. * @param parentPath Parent node path. * @param path Node to create. diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index e9196f2..d9d56ae 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -101,6 +101,7 @@ import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS; @@ -854,7 +855,8 @@ public class ZookeeperDiscoveryImpl { zkPaths.customEvtsDir, zkPaths.customEvtsPartsDir, zkPaths.customEvtsAcksDir, - zkPaths.aliveNodesDir}; + zkPaths.aliveNodesDir, + zkPaths.stoppedNodesFlagsDir}; List<String> dirs = new ArrayList<>(); @@ -1009,7 +1011,7 @@ public class ZookeeperDiscoveryImpl { final int OVERHEAD = 5; // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193 - String joinDataPath = zkPaths.joinDataDir + "/" + prefix + ":" + locNode.id(); + String joinDataPath = ZkIgnitePaths.join(zkPaths.joinDataDir, prefix + ":" + locNode.id()); if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, OVERHEAD)) { List<byte[]> parts = zkClient.splitNodeData(joinDataPath, joinDataBytes, OVERHEAD); @@ -1379,7 +1381,7 @@ public class ZookeeperDiscoveryImpl { PreviousNodeWatcher watcher = new ServerPreviousNodeWatcher(rtState); - rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); + rtState.zkClient.existsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, prevE.getValue()), watcher, watcher); } } @@ -1478,7 +1480,7 @@ public class ZookeeperDiscoveryImpl { PreviousNodeWatcher watcher = new ClientPreviousNodeWatcher(rtState); - rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + watchPath, watcher, watcher); + rtState.zkClient.existsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, watchPath), watcher, watcher); } } @@ -1512,6 +1514,16 @@ public class ZookeeperDiscoveryImpl { * @throws Exception If failed. */ private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) throws Exception { + for (String stoppedFlagPath : rtState.zkClient.getChildren(zkPaths.stoppedNodesFlagsDir)) { + long leftIntId = ZkIgnitePaths.stoppedFlagNodeInternalId(stoppedFlagPath); + + if (!rtState.top.nodesByInternalId.containsKey(leftIntId)) { + rtState.zkClient.deleteIfExistsAsync( + ZkIgnitePaths.join(zkPaths.stoppedNodesFlagsDir, stoppedFlagPath) + ); + } + } + for (ZkDiscoveryEventData evtData : lastEvts.evts.values()) { if (evtData instanceof ZkDiscoveryCustomEventData) { ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; @@ -1620,7 +1632,7 @@ public class ZookeeperDiscoveryImpl { private void watchAliveNodeData(String alivePath) { assert rtState.locNodeZkPath != null; - String path = zkPaths.aliveNodesDir + "/" + alivePath; + String path = ZkIgnitePaths.join(zkPaths.aliveNodesDir, alivePath); if (!path.equals(rtState.locNodeZkPath)) rtState.zkClient.getDataAsync(path, rtState.aliveNodeDataWatcher, rtState.aliveNodeDataWatcher); @@ -1642,6 +1654,11 @@ public class ZookeeperDiscoveryImpl { rtState.updateAlives = false; } + Set<Long> stoppedNodes = new HashSet<>(); + + for (String stoppedFlagPath : rtState.zkClient.getChildren(zkPaths.stoppedNodesFlagsDir)) + stoppedNodes.add(ZkIgnitePaths.stoppedFlagNodeInternalId(stoppedFlagPath)); + TreeMap<Long, String> alives = new TreeMap<>(); for (String child : aliveNodes) { @@ -1670,7 +1687,7 @@ public class ZookeeperDiscoveryImpl { failedNodes.add(failedNode); - generateNodeFail(curTop, failedNode); + generateNodeLeave(curTop, failedNode, !stoppedNodes.contains(failedNode.internalId())); newEvts++; @@ -2031,15 +2048,11 @@ public class ZookeeperDiscoveryImpl { String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId); client.setData(joinDataPath, marshalZip(joinErr), -1); - - client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); } - else { - if (log.isInfoEnabled()) + else if (log.isInfoEnabled()) log.info("Ignore join data, node was failed by previous coordinator: " + aliveNodePath); - client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, -1); - } + client.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, aliveNodePath), -1); } /** @@ -2180,25 +2193,35 @@ public class ZookeeperDiscoveryImpl { /** * @param curTop Current topology. - * @param failedNode Failed node. + * @param leftNode Failed node. + * @param failed Whether node failed or stopped. */ - private void generateNodeFail(TreeMap<Long, ZookeeperClusterNode> curTop, ZookeeperClusterNode failedNode) { - Object rmvd = curTop.remove(failedNode.order()); + private void generateNodeLeave( + TreeMap<Long, ZookeeperClusterNode> curTop, + ZookeeperClusterNode leftNode, + boolean failed + ) { + Object rmvd = curTop.remove(leftNode.order()); assert rmvd != null; rtState.evtsData.topVer++; rtState.evtsData.evtIdGen++; - ZkDiscoveryNodeFailEventData evtData = new ZkDiscoveryNodeFailEventData( + ZkDiscoveryNodeLeaveEventData evtData = new ZkDiscoveryNodeLeaveEventData( rtState.evtsData.evtIdGen, rtState.evtsData.topVer, - failedNode.internalId()); + leftNode.internalId(), + failed + ); rtState.evtsData.addEvent(curTop.values(), evtData); - if (log.isInfoEnabled()) - log.info("Generated NODE_FAILED event [evt=" + evtData + ']'); + if (log.isInfoEnabled()) { + String evtName = failed ? "NODE_FAILED" : "NODE_LEFT"; + + log.info("Generated " + evtName + " event [evt=" + evtData + ']'); + } } /** @@ -2389,12 +2412,14 @@ public class ZookeeperDiscoveryImpl { batch.addAll(client.getChildrenPaths(zkPaths.customEvtsAcksDir)); + batch.addAll(client.getChildrenPaths(zkPaths.stoppedNodesFlagsDir)); + client.deleteAll(batch, -1); if (startInternalOrder > 0) { for (String alive : client.getChildren(zkPaths.aliveNodesDir)) { if (ZkIgnitePaths.aliveInternalId(alive) < startInternalOrder) - client.deleteIfExists(zkPaths.aliveNodesDir + "/" + alive, -1); + client.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, alive), -1); } } @@ -2423,7 +2448,7 @@ public class ZookeeperDiscoveryImpl { return readMultipleParts(zkClient, partsBasePath, partCnt); } else - return zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath); + return zkClient.getData(ZkIgnitePaths.join(zkPaths.customEvtsDir, evtPath)); } /** @@ -2594,7 +2619,7 @@ public class ZookeeperDiscoveryImpl { for (String child : rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) { if (ZkIgnitePaths.aliveInternalId(child) == internalId) { // Need use sync delete to do not process again join of this node again. - rtState.zkClient.deleteIfExists(zkPaths.aliveNodesDir + "/" + child, -1); + rtState.zkClient.deleteIfExists(ZkIgnitePaths.join(zkPaths.aliveNodesDir, child), -1); return; } @@ -2623,7 +2648,7 @@ public class ZookeeperDiscoveryImpl { } } - zkClient.deleteIfExistsAsync(zkPaths.customEvtsDir + "/" + evtPath); + zkClient.deleteIfExistsAsync(ZkIgnitePaths.join(zkPaths.customEvtsDir, evtPath)); } /** @@ -2690,13 +2715,13 @@ public class ZookeeperDiscoveryImpl { break; } - case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: { + case ZkDiscoveryEventData.ZK_EVT_NODE_LEFT: { if (!rtState.joined) break; evtProcessed = true; - notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData); + notifyNodeLeave((ZkDiscoveryNodeLeaveEventData)evtData); break; } @@ -3204,7 +3229,7 @@ public class ZookeeperDiscoveryImpl { String alive = alives.get(i); if (internalIds.contains(ZkIgnitePaths.aliveInternalId(alive))) - rtState.zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + alive); + rtState.zkClient.deleteIfExistsAsync(ZkIgnitePaths.join(zkPaths.aliveNodesDir, alive)); } } @@ -3532,8 +3557,8 @@ public class ZookeeperDiscoveryImpl { /** * @param evtData Event data. */ - private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) { - notifyNodeFail(evtData.failedNodeInternalId(), evtData.topologyVersion()); + private void notifyNodeLeave(final ZkDiscoveryNodeLeaveEventData evtData) { + notifyNodeLeave(evtData.leftNodeInternalId(), evtData.topologyVersion(), evtData.failed()); } /** @@ -3541,11 +3566,23 @@ public class ZookeeperDiscoveryImpl { * @param topVer Topology version. */ private void notifyNodeFail(long nodeInternalOrder, long topVer) { - final ZookeeperClusterNode failedNode = rtState.top.removeNode(nodeInternalOrder); + notifyNodeLeave(nodeInternalOrder, topVer, true); + } - assert failedNode != null && !failedNode.isLocal() : failedNode; + /** + * @param nodeInternalOrder Node order. + * @param topVer Topology version. + * @param failed {@code true} if node failed, {@code false} otherwise. + */ + private void notifyNodeLeave(long nodeInternalOrder, long topVer, boolean failed) { + final ZookeeperClusterNode leftNode = rtState.top.removeNode(nodeInternalOrder); - PingFuture pingFut = pingFuts.get(failedNode.order()); + assert leftNode != null && !leftNode.isLocal() : leftNode; + + if (!failed && rtState.crd) + rtState.zkClient.deleteIfExistsAsync(zkPaths.nodeStoppedFlag(leftNode)); + + PingFuture pingFut = pingFuts.get(leftNode.order()); if (pingFut != null) pingFut.onDone(false); @@ -3554,9 +3591,9 @@ public class ZookeeperDiscoveryImpl { lsnr.onDiscovery( new DiscoveryNotification( - EVT_NODE_FAILED, + failed ? EVT_NODE_FAILED : EVT_NODE_LEFT, topVer, - failedNode, + leftNode, topSnapshot, Collections.emptyMap(), null, @@ -3564,7 +3601,10 @@ public class ZookeeperDiscoveryImpl { ) ).get(); - stats.onNodeFailed(); + if (failed) + stats.onNodeFailed(); + else + stats.onNodeLeft(); } /** @@ -3680,11 +3720,11 @@ public class ZookeeperDiscoveryImpl { break; } - case ZkDiscoveryEventData.ZK_EVT_NODE_FAILED: { + case ZkDiscoveryEventData.ZK_EVT_NODE_LEFT: { if (log.isDebugEnabled()) - log.debug("All nodes processed node fail [evtData=" + evtData + ']'); + log.debug("All nodes processed node left [evtData=" + evtData + ']'); - break; // Do not need addition cleanup. + break; } } @@ -3899,7 +3939,7 @@ public class ZookeeperDiscoveryImpl { * */ public void stop() { - stop0(new IgniteSpiException("Node stopped")); + stop0(null); } /** @@ -3913,6 +3953,14 @@ public class ZookeeperDiscoveryImpl { if (rtState.zkClient != null && rtState.locNodeZkPath != null && rtState.zkClient.connected()) { try { + if (e == null && rtState.joined) { + rtState.zkClient.createIfNeededNoRetry( + zkPaths.nodeStoppedFlag(locNode), + null, + PERSISTENT + ); + } + rtState.zkClient.deleteIfExistsNoRetry(rtState.locNodeZkPath, -1); } catch (Exception err) { diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java index cc95dd3..21b62c4 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.spi.discovery.zk.internal; +import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -23,42 +24,55 @@ import org.apache.ignite.internal.util.typedef.internal.S; */ public class ZookeeperDiscoveryStatistics { /** */ - private long joinedNodesCnt; + private final LongAdder joinedNodesCnt = new LongAdder(); /** */ - private long failedNodesCnt; + private final LongAdder failedNodesCnt = new LongAdder(); + + /** */ + private final LongAdder leftNodesCnt = new LongAdder(); /** Communication error count. */ - private long commErrCnt; + private final LongAdder commErrCnt = new LongAdder(); /** */ public long joinedNodesCnt() { - return joinedNodesCnt; + return joinedNodesCnt.longValue(); } /** */ public long failedNodesCnt() { - return failedNodesCnt; + return failedNodesCnt.longValue(); + } + + /** */ + public long leftNodesCnt() { + return leftNodesCnt.longValue(); } /** */ public long commErrorCount() { - return commErrCnt; + return commErrCnt.longValue(); } /** */ public void onNodeJoined() { - joinedNodesCnt++; + joinedNodesCnt.increment(); } /** */ public void onNodeFailed() { - failedNodesCnt++; + failedNodesCnt.increment(); + } + + /** */ + public void onNodeLeft() { + leftNodesCnt.increment(); } /** */ public void onCommunicationError() { - commErrCnt++; + commErrCnt.increment(); } /** {@inheritDoc} */ diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java index d5be881..03d6a43 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java @@ -24,6 +24,7 @@ import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryCommunicati import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryConcurrentStartAndStartStopTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryCustomEventsTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryMiscTest; +import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryRandomStopOrFailConcurrentTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySegmentationAndConnectionRestoreTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiSaslFailedAuthTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiSaslSuccessfulAuthTest; @@ -43,6 +44,7 @@ import org.junit.runners.Suite; ZookeeperValidatePathsTest.class, ZookeeperDiscoverySegmentationAndConnectionRestoreTest.class, ZookeeperDiscoveryConcurrentStartAndStartStopTest.class, + ZookeeperDiscoveryRandomStopOrFailConcurrentTest.class, ZookeeperDiscoveryTopologyChangeAndReconnectTest.class, ZookeeperDiscoveryCommunicationFailureTest.class, ZookeeperDiscoveryClientDisconnectTest.class, diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java index 1572af5..cea5975 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryConcurrentStartAndStartStopTest.java @@ -180,7 +180,9 @@ public class ZookeeperDiscoveryConcurrentStartAndStartStopTest extends Zookeeper }, NODES, "stop-node"); for (int j = 0; j < NODES; j++) - expEvts[j] = ZookeeperDiscoverySpiTestHelper.failEvent(++topVer); + expEvts[j] = ZookeeperDiscoverySpiTestHelper.leftEvent(++topVer, false); + + helper.checkEvents(ignite(0), evts, expEvts); checkEventsConsistency(); } @@ -199,6 +201,8 @@ public class ZookeeperDiscoveryConcurrentStartAndStartStopTest extends Zookeeper startGridsMultiThreaded(3, false); + checkZkNodesCleanup(); + waitForTopology(3); } diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java index c644a4b..f271bad 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryMiscTest.java @@ -255,6 +255,8 @@ public class ZookeeperDiscoveryMiscTest extends ZookeeperDiscoverySpiTestBase { stopGrid(0); + waitForTopology(2); + assertEquals(mbean.getCoordinator(), srv2.localNode().id()); assertEquals(mbean.getCoordinatorNodeFormatted(), String.valueOf(srv2.localNode())); } diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryRandomStopOrFailConcurrentTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryRandomStopOrFailConcurrentTest.java new file mode 100644 index 0000000..0f9935b --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryRandomStopOrFailConcurrentTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.discovery.DiscoverySpiMBean; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiMBean; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * + */ +@RunWith(Parameterized.class) +public class ZookeeperDiscoveryRandomStopOrFailConcurrentTest extends ZookeeperDiscoverySpiTestBase { + /** */ + private static final int NUM_CLIENTS = 10; + + /** */ + private static final int NUM_SERVERS = 10; + + /** */ + private static final int ZK_SESSION_TIMEOUT = 5_000; + + /** */ + @Parameterized.Parameters(name = "stop mode = {0}, with crd = {1}") + public static Collection<Object[]> parameters() { + List<Object[]> params = new ArrayList<>(); + + for (StopMode stopMode: StopMode.values()) { + params.add(new Object[] {stopMode, true}); + params.add(new Object[] {stopMode, false}); + } + + return params; + } + + /** */ + @Parameterized.Parameter(0) + public StopMode stopMode; + + /** */ + @Parameterized.Parameter(1) + public boolean killCrd; + + /** */ + private final AtomicLong nodesLeft = new AtomicLong(0); + + /** */ + private final AtomicLong nodesFailed = new AtomicLong(0); + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setClusterStateOnStart(ClusterState.INACTIVE); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + sesTimeout = ZK_SESSION_TIMEOUT; + + testSockNio = true; + + clientReconnectDisabled = true; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (Ignite g: G.allGrids()) { + ZkTestClientCnxnSocketNIO cnxn = ZkTestClientCnxnSocketNIO.forNode(g); + + if (cnxn != null) + cnxn.allowConnect(); + } + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected void waitForTopology(int expSize) throws Exception { + assertTrue(GridTestUtils.waitForCondition(() -> grid(0).cluster().nodes().size() == expSize, 30_000)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testStopOrFailConcurrently() throws Exception { + IgniteEx client = startServersAndClients(NUM_SERVERS, NUM_CLIENTS); + + int crd = getCoordinatorIndex(); + + List<Integer> srvToStop = IntStream.range(1, NUM_SERVERS + 1) + .filter(j -> j != crd) + .boxed() + .collect(Collectors.collectingAndThen(Collectors.toList(), list -> { + Collections.shuffle(list); + + return list.subList(0, NUM_SERVERS / 2); + })); + + if (killCrd) + srvToStop.set(0, crd); + + List<Integer> cliToStop = IntStream.range(NUM_SERVERS + 1, NUM_CLIENTS + NUM_SERVERS) + .boxed() + .collect(Collectors.collectingAndThen(Collectors.toList(), list -> { + Collections.shuffle(list); + + return list.subList(0, NUM_CLIENTS / 2); + })); + + srvToStop.addAll(cliToStop); + + stopOrKillMultithreaded(srvToStop); + + waitForTopology(NUM_CLIENTS + NUM_SERVERS - srvToStop.size()); + + checkStopFlagsDeleted(10_000); + + DiscoverySpiMBean mBean = getMbean(client); + + GridTestUtils.waitForCondition(() -> nodesLeft.get() == mBean.getNodesLeft(), 10_000); + GridTestUtils.waitForCondition(() -> nodesFailed.get() == mBean.getNodesFailed(), 10_000); + } + + /** */ + private void checkStopFlagsDeleted(long timeout) throws Exception { + ZookeeperClient zkClient = new ZookeeperClient(getTestResources().getLogger(), + zkCluster.getConnectString(), + 30_000, + null); + + ZkIgnitePaths paths = new ZkIgnitePaths(ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT); + + GridTestUtils.waitForCondition(() -> { + try { + return zkClient.getChildren(paths.stoppedNodesFlagsDir).isEmpty(); + } + catch (Exception e) { + if (e instanceof InterruptedException) + Thread.currentThread().interrupt(); + + throw new RuntimeException("Failed to wait for stopped nodes flags", e); + } + }, timeout); + } + + /** */ + private void stopOrKillMultithreaded(final List<Integer> stopIndices) throws Exception { + log.info("Stopping or killing nodes by idx: " + stopIndices.toString()); + + final StopMode mode = stopMode; + + GridTestUtils.runMultiThreaded((idx) -> { + try { + Random rnd = ThreadLocalRandom.current(); + + int nodeIdx = stopIndices.get(idx); + + if (mode == StopMode.FAIL_ONLY || (mode == StopMode.RANDOM && rnd.nextBoolean())) { + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(grid(nodeIdx)); + + c0.closeSocket(true); + + nodesFailed.incrementAndGet(); + } + else { + stopGrid(nodeIdx); + + nodesLeft.incrementAndGet(); + } + } + catch (Exception e) { + e.printStackTrace(); + + fail(e.getMessage()); + } + }, stopIndices.size(), "stop-node"); + } + + /** */ + private int getCoordinatorIndex() { + UUID crdId = getMbean(grid(0)).getCoordinator(); + + Optional<Integer> crdIdx = grid(0).cluster().nodes().stream().filter(n -> n.id().equals(crdId)) + .map(n -> getTestIgniteInstanceIndex((String)n.consistentId())).findAny(); + + assertTrue(crdIdx.isPresent()); + + return crdIdx.get(); + } + + /** */ + private DiscoverySpiMBean getMbean(IgniteEx grid) { + ZookeeperDiscoverySpiMBean bean = getMxBean(grid.context().igniteInstanceName(), "SPIs", + ZookeeperDiscoverySpi.class, ZookeeperDiscoverySpiMBean.class); + + assertNotNull(bean); + + return bean; + } + + /** */ + private IgniteEx startServersAndClients(int numServers, int numClients) throws Exception { + startGridsMultiThreaded(1, numServers); + startClientGridsMultiThreaded(numServers + 1, numClients - 1); + + IgniteEx res = startClientGrid(0); + + waitForTopology(numClients + numServers); + + // Set initial value of counters from MBean. + nodesLeft.addAndGet(getMbean(res).getNodesLeft()); + nodesFailed.addAndGet(getMbean(res).getNodesFailed()); + + return res; + } + + enum StopMode { + STOP_ONLY, + FAIL_ONLY, + RANDOM + } +} diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java index 49e39a8..d33932b 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySegmentationAndConnectionRestoreTest.java @@ -391,7 +391,7 @@ public class ZookeeperDiscoverySegmentationAndConnectionRestoreTest extends Zook closeZkClient(spi); - helper.checkEvents(node0, evts, ZookeeperDiscoverySpiTestHelper.failEvent(4)); + helper.checkEvents(node0, evts, ZookeeperDiscoverySpiTestHelper.leftEvent(4, true)); } c1.allowConnect(); @@ -399,7 +399,7 @@ public class ZookeeperDiscoverySegmentationAndConnectionRestoreTest extends Zook helper.checkEvents(ignite(1), evts, ZookeeperDiscoverySpiTestHelper.joinEvent(3)); if (failWhenDisconnected) { - helper.checkEvents(ignite(1), evts, ZookeeperDiscoverySpiTestHelper.failEvent(4)); + helper.checkEvents(ignite(1), evts, ZookeeperDiscoverySpiTestHelper.leftEvent(4, true)); IgnitionEx.stop(getTestIgniteInstanceName(2), true, ShutdownPolicy.IMMEDIATE, true); } diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java index d23aa97..bed11bd 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java @@ -80,7 +80,10 @@ import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestUtil; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; +import org.apache.zookeeper.ZooKeeper; import org.jetbrains.annotations.Nullable; import static java.util.concurrent.TimeUnit.SECONDS; @@ -566,6 +569,93 @@ class ZookeeperDiscoverySpiTestBase extends GridCommonAbstractTest { } } + /** + * @throws Exception If failed. + */ + protected void checkZkNodesCleanup() throws Exception { + final ZookeeperClient zkClient = new ZookeeperClient(getTestResources().getLogger(), + zkCluster.getConnectString(), + 30_000, + null); + + final String basePath = ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT + "/"; + + final String aliveDir = basePath + ZkIgnitePaths.ALIVE_NODES_DIR + "/"; + + try { + List<String> znodes = listSubTree(zkClient.zk(), ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT); + + boolean foundAlive = false; + + for (String znode : znodes) { + if (znode.startsWith(aliveDir)) { + foundAlive = true; + + break; + } + } + + assertTrue(foundAlive); // Sanity check to make sure we check correct directory. + + assertTrue("Failed to wait for unused znodes cleanup", GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + List<String> znodes = listSubTree(zkClient.zk(), ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT); + + for (String znode : znodes) { + if (znode.startsWith(aliveDir) || znode.length() < basePath.length()) + continue; + + znode = znode.substring(basePath.length()); + + if (!znode.contains("/")) // Ignore roots. + continue; + + // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193 + if (znode.startsWith("jd/")) + continue; + + log.info("Found unexpected znode: " + znode); + + return false; + } + + return true; + } + catch (Exception e) { + error("Unexpected error: " + e, e); + + fail("Unexpected error: " + e); + } + + return false; + } + }, 10_000)); + } + finally { + zkClient.close(); + } + } + + /** + * @param zk ZooKeeper client. + * @param root Root path. + * @return All children znodes for given path. + * @throws Exception If failed/ + */ + private List<String> listSubTree(ZooKeeper zk, String root) throws Exception { + for (int i = 0; i < 30; i++) { + try { + return ZKUtil.listSubTreeBFS(zk, root); + } + catch (KeeperException.NoNodeException e) { + info("NoNodeException when get znodes, will retry: " + e); + } + } + + throw new Exception("Failed to get znodes: " + root); + } + /** */ private CacheConfiguration getCacheConfiguration() { CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java index 32e3855..be5f2e6 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestHelper.java @@ -135,8 +135,10 @@ class ZookeeperDiscoverySpiTestHelper { * @param topVer Topology version. * @return Expected event instance. */ - static DiscoveryEvent failEvent(long topVer) { - DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); + static DiscoveryEvent leftEvent(long topVer, boolean fail) { + int eventType = fail ? EventType.EVT_NODE_FAILED : EventType.EVT_NODE_LEFT; + + DiscoveryEvent expEvt = new DiscoveryEvent(null, null, eventType, null); expEvt.topologySnapshot(topVer, null); diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java index ba17a2f..f38baa7 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryTopologyChangeAndReconnectTest.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; import org.apache.ignite.internal.processors.query.DummyQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryProcessor; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -49,8 +48,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; import org.apache.zookeeper.ZooKeeper; import org.junit.Ignore; @@ -240,74 +237,6 @@ public class ZookeeperDiscoveryTopologyChangeAndReconnectTest extends ZookeeperD /** * @throws Exception If failed. */ - private void checkZkNodesCleanup() throws Exception { - final ZookeeperClient zkClient = new ZookeeperClient(getTestResources().getLogger(), - zkCluster.getConnectString(), - 30_000, - null); - - final String basePath = ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT + "/"; - - final String aliveDir = basePath + ZkIgnitePaths.ALIVE_NODES_DIR + "/"; - - try { - List<String> znodes = listSubTree(zkClient.zk(), ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT); - - boolean foundAlive = false; - - for (String znode : znodes) { - if (znode.startsWith(aliveDir)) { - foundAlive = true; - - break; - } - } - - assertTrue(foundAlive); // Sanity check to make sure we check correct directory. - - assertTrue("Failed to wait for unused znodes cleanup", GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - List<String> znodes = listSubTree(zkClient.zk(), ZookeeperDiscoverySpiTestHelper.IGNITE_ZK_ROOT); - - for (String znode : znodes) { - if (znode.startsWith(aliveDir) || znode.length() < basePath.length()) - continue; - - znode = znode.substring(basePath.length()); - - if (!znode.contains("/")) // Ignore roots. - continue; - - // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193 - if (znode.startsWith("jd/")) - continue; - - log.info("Found unexpected znode: " + znode); - - return false; - } - - return true; - } - catch (Exception e) { - error("Unexpected error: " + e, e); - - fail("Unexpected error: " + e); - } - - return false; - } - }, 10_000)); - } - finally { - zkClient.close(); - } - } - - /** - * @throws Exception If failed. - */ @Ignore("https://issues.apache.org/jira/browse/IGNITE-9138") @Test public void testRandomTopologyChanges_RestartZk() throws Exception { @@ -767,25 +696,6 @@ public class ZookeeperDiscoveryTopologyChangeAndReconnectTest extends ZookeeperD } /** - * @param zk ZooKeeper client. - * @param root Root path. - * @return All children znodes for given path. - * @throws Exception If failed/ - */ - private List<String> listSubTree(ZooKeeper zk, String root) throws Exception { - for (int i = 0; i < 30; i++) { - try { - return ZKUtil.listSubTreeBFS(zk, root); - } - catch (KeeperException.NoNodeException e) { - info("NoNodeException when get znodes, will retry: " + e); - } - } - - throw new Exception("Failed to get znodes: " + root); - } - - /** * @param cacheName Cache name. * @return Configuration. */ diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java index 2b741a1..47fe0ac 100644 --- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java +++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java @@ -114,7 +114,8 @@ public class ZkTestClientCnxnSocketNIO extends ClientCnxnSocketNIO { * */ public void allowConnect() { - assert blockConnectLatch != null && blockConnectLatch.getCount() == 1 : blockConnectLatch; + if (blockConnectLatch == null || blockConnectLatch.getCount() == 0) + return; log.info("ZkTestClientCnxnSocketNIO allowConnect [node=" + nodeName + ']');