Repository: ignite Updated Branches: refs/heads/ignite-zk 5d8feab45 -> 9ffd603d2
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ffd603d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ffd603d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ffd603d Branch: refs/heads/ignite-zk Commit: 9ffd603d217034247497b6c2734933872c8a78ed Parents: 5d8feab Author: sboikov <sboi...@gridgain.com> Authored: Thu Nov 23 12:04:42 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Nov 23 13:44:06 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 15 - .../org/apache/ignite/internal/IgnitionEx.java | 28 +- .../zk/internal/ZkDiscoveryEventData.java | 49 +++- .../zk/internal/ZkDiscoveryEventsData.java | 9 +- .../internal/ZkDiscoveryNodeFailEventData.java | 4 +- .../internal/ZkDiscoveryNodeJoinEventData.java | 4 +- .../discovery/zk/internal/ZkEventAckFuture.java | 1 - .../discovery/zk/internal/ZkIgnitePaths.java | 50 +++- .../zk/internal/ZkJoinEventDataForJoined.java | 6 + .../discovery/zk/internal/ZookeeperClient.java | 50 ++++ .../ZookeeperClientFailedException.java | 2 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 287 ++++++++++--------- .../ZookeeperDiscoverySpiBasicTest.java | 201 +++++++++++-- 13 files changed, 525 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b58819c..19e1f1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; import javax.management.JMException; import javax.management.ObjectName; -import org.apache.curator.test.TestingCluster; import org.apache.ignite.DataRegionMetrics; import org.apache.ignite.DataRegionMetricsAdapter; import org.apache.ignite.DataStorageMetrics; @@ -265,20 +264,6 @@ import static org.apache.ignite.lifecycle.LifecycleEventType.BEFORE_NODE_START; */ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** */ - public static TestingCluster zkCluster; - - static { - zkCluster = new TestingCluster(1); - - try { - zkCluster.start(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - - /** */ private static final long serialVersionUID = 0L; /** Ignite site that is shown in log messages. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index e6f5442..cc7e266 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -44,6 +44,7 @@ import java.util.logging.Handler; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -153,6 +154,25 @@ import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_J * GridConfiguration cfg = new GridConfiguration(); */ public class IgnitionEx { + /** */ + public static final boolean TEST_ZK = true; + + /** */ + public static TestingCluster zkCluster; + + static { + if (TEST_ZK) { + zkCluster = new TestingCluster(1); + + try { + zkCluster.start(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + } + /** Default configuration path relative to Ignite home. */ public static final String DFLT_CFG = "config/default-config.xml"; @@ -2222,11 +2242,13 @@ public class IgnitionEx { initializeDataStorageConfiguration(myCfg); - ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); + if (TEST_ZK) { + ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); - zkSpi.setZkConnectionString(IgniteKernal.zkCluster.getConnectString()); + zkSpi.setZkConnectionString(zkCluster.getConnectString()); - myCfg.setDiscoverySpi(zkSpi); + myCfg.setDiscoverySpi(zkSpi); + } return myCfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java index 00330e4..e7e8d31 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java @@ -49,6 +49,7 @@ abstract class ZkDiscoveryEventData implements Serializable { int flags; /** + * @param evtId Event ID. * @param evtType Event type. * @param topVer Topology version. */ @@ -60,21 +61,46 @@ abstract class ZkDiscoveryEventData implements Serializable { this.topVer = topVer; } - void remainingAcks(Collection<ZookeeperClusterNode> nodes) { + /** + * @param nodes Current nodes in topology. + */ + void initRemainingAcks(Collection<ZookeeperClusterNode> nodes) { assert remainingAcks == null : this; remainingAcks = U.newHashSet(nodes.size()); for (ZookeeperClusterNode node : nodes) { - if (!node.isLocal() && node.order() <= topVer) - remainingAcks.add(node.internalId()); + if (!node.isLocal() && node.order() <= topVer) { + boolean add = remainingAcks.add(node.internalId()); + + assert add : node; + } } } + /** + * @param node Node. + */ + void addRemainingAck(ZookeeperClusterNode node) { + assert node.order() <= topVer : node; + + boolean add = remainingAcks.add(node.internalId()); + + assert add : node; + } + + /** + * @return {@code True} if all nodes processed event. + */ boolean allAcksReceived() { return remainingAcks.isEmpty(); } + /** + * @param nodeInternalId Node ID. + * @param ackEvtId Last event ID processed on node. + * @return {@code True} if all nodes processed event. + */ boolean onAckReceived(Integer nodeInternalId, long ackEvtId) { assert remainingAcks != null; @@ -84,6 +110,10 @@ abstract class ZkDiscoveryEventData implements Serializable { return remainingAcks.isEmpty(); } + /** + * @param node Failed node. + * @return {@code True} if all nodes processed event. + */ boolean onNodeFail(ZookeeperClusterNode node) { assert remainingAcks != null : this; @@ -92,18 +122,31 @@ abstract class ZkDiscoveryEventData implements Serializable { return remainingAcks.isEmpty(); } + /** + * @param flag Flag mask. + * @return {@code True} if flag set. + */ boolean flagSet(int flag) { return (flags & flag) == flag; } + /** + * @return Event ID. + */ long eventId() { return evtId; } + /** + * @return Event type. + */ int eventType() { return evtType; } + /** + * @return Event topology version. + */ long topologyVersion() { return topVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index ce21a06..6625ec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -44,9 +44,9 @@ class ZkDiscoveryEventsData implements Serializable { TreeMap<Long, ZkDiscoveryEventData> evts; /** - * @param topVer - * @param gridStartTime - * @param evts + * @param topVer Current topology version. + * @param gridStartTime Cluster start time. + * @param evts Events history. */ ZkDiscoveryEventsData(long gridStartTime, long topVer, TreeMap<Long, ZkDiscoveryEventData> evts) { this.gridStartTime = gridStartTime; @@ -55,6 +55,7 @@ class ZkDiscoveryEventsData implements Serializable { } /** + * @param nodes Current nodes in topology (these nodes should ack that event processed). * @param evt Event. */ void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData evt) { @@ -62,6 +63,6 @@ class ZkDiscoveryEventsData implements Serializable { assert old == null : old; - evt.remainingAcks(nodes); + evt.initRemainingAcks(nodes); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java index e6ba4bd..b25f39c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java @@ -49,6 +49,8 @@ class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData { /** {@inheritDoc} */ @Override public String toString() { - return "NodeFailEventData [topVer=" + topologyVersion() + ", nodeId=" + failedNodeInternalId + ']'; + return "ZkDiscoveryNodeFailEventData [topVer=" + topologyVersion() + + ", evtId=" + eventId() + + ", nodeId=" + failedNodeInternalId + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index 4482916..e96f386 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -51,6 +51,8 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { /** {@inheritDoc} */ @Override public String toString() { - return "NodeJoinEventData [topVer=" + topologyVersion() + ", node=" + nodeId + ']'; + return "ZkDiscoveryNodeJoinEventData [topVer=" + topologyVersion() + + ", evtId=" + eventId() + + ", node=" + nodeId + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java index c89b586..ab0dad9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java @@ -17,7 +17,6 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.ignite.IgniteLogger; http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 1f6315c..9f1b859 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -171,49 +171,93 @@ class ZkIgnitePaths { * @param path Relative path. * @return Full path. */ - String zkPath(String path) { + private String zkPath(String path) { return basePath + "/" + clusterName + "/" + path; } + /** + * @param path Alive node zk path. + * @return Node internal ID. + */ static int aliveInternalId(String path) { int idx = path.lastIndexOf('|'); return Integer.parseInt(path.substring(idx + 1)); } + /** + * @param path Alive node zk path. + * @return Node ID. + */ + static String aliveNodePrefixId(String path) { + return path.substring(0, ZkIgnitePaths.UUID_LEN); + } + + /** + * @param path Alive node zk path. + * @return Node ID. + */ static UUID aliveNodeId(String path) { - String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN); + // <uuid prefix>:<node id>|<join data seq>|<alive seq> + int startIdx = ZkIgnitePaths.UUID_LEN + 1; + + String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); return UUID.fromString(idStr); } - static int aliveJoinSequence(String path) { + /** + * @param path Alive node zk path. + * @return Joined node sequence. + */ + static int aliveJoinDataSequence(String path) { int idx1 = path.indexOf('|'); int idx2 = path.lastIndexOf('|'); return Integer.parseInt(path.substring(idx1 + 1, idx2)); } + /** + * @param path Event zk path. + * @return Event sequence number. + */ static int customEventSequence(String path) { int idx = path.lastIndexOf('|'); return Integer.parseInt(path.substring(idx + 1)); } + /** + * @param path Custom event zl path. + * @return Event node ID. + */ static UUID customEventSendNodeId(String path) { String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN); return UUID.fromString(idStr); } + /** + * @param evtId Event ID. + * @return Event zk path. + */ String joinEventDataPath(long evtId) { return evtsPath + "/" + evtId; } + /** + * @param evtId Event ID. + * @return Event zk path. + */ String joinEventDataPathForJoined(long evtId) { return evtsPath + "/joined-" + evtId; } + /** + * @param ack Ack event flag. + * @param child Event child path. + * @return Full event data path. + */ String customEventDataPath(boolean ack, String child) { String baseDir = ack ? customEvtsAcksDir : customEvtsDir; http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java index cdbfdc0..eb24f27 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java @@ -43,10 +43,16 @@ class ZkJoinEventDataForJoined implements Serializable { this.discoData = discoData; } + /** + * @return Current topology. + */ List<ZookeeperClusterNode> topology() { return top; } + /** + * @return Discovery data. + */ Map<Integer, Serializable> discoveryData() { return discoData; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 626b235..d4d23ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -266,6 +266,55 @@ public class ZookeeperClient implements Watcher { } } + String createSequential(String checkPrefix, String dir, String childPath, byte[] data, CreateMode createMode) + throws ZookeeperClientFailedException, InterruptedException + { + assert createMode.isSequential() : createMode; + + if (data == null) + data = EMPTY_BYTES; + + boolean first = true; + + String path = dir + "/" + childPath; + + for (;;) { + long connStartTime = this.connStartTime; + + try { + if (first) { + List<String> children = zk.getChildren(dir, false); + + for (int i = 0; i < children.size(); i++) { + String child = children.get(i); + + if (children.get(i).startsWith(checkPrefix)) { + String resPath = dir + "/" + child; + + log.info("Check before retry, node already created: " + resPath); + + return resPath; + } + } + } + + return zk.create(path, data, ZK_ACL, createMode); + } + catch (KeeperException.NodeExistsException e) { + assert !createMode.isSequential() : createMode; + + log.info("Node already exists: " + path); + + return path; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } + + first = false; + } + } + List<String> getChildren(String path) throws ZookeeperClientFailedException, InterruptedException { @@ -469,6 +518,7 @@ public class ZookeeperClient implements Watcher { U.warn(log, "Zookeeper operation failed, will retry [err=" + e + ", retryTimeout=" + RETRY_TIMEOUT + ", connLossTimeout=" + connLossTimeout + + ", path=" + ((KeeperException)e).getPath() + ", remainingWaitTime=" + remainingTime + ']'); stateMux.wait(RETRY_TIMEOUT); http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java index b222d58..99f2a6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java @@ -27,7 +27,7 @@ public class ZookeeperClientFailedException extends Exception { /** * @param cause Cause. */ - public ZookeeperClientFailedException(Throwable cause) { + ZookeeperClientFailedException(Throwable cause) { super(cause); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 00a0974..a04314d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; @@ -122,10 +123,13 @@ public class ZookeeperDiscoveryImpl { /** */ private final int evtsAckThreshold; + /** */ + private int procEvtCnt; + /** * @param log Logger. * @param basePath Zookeeper base path node all nodes. - * @param clusterName Cluster name ( + * @param clusterName Cluster name. * @param locNode Local node instance. * @param lsnr Discovery events listener. * @param exchange Discovery data exchange. @@ -351,18 +355,29 @@ public class ZookeeperDiscoveryImpl { */ private void startJoin(byte[] joinDataBytes) throws InterruptedException { try { + String prefix = UUID.randomUUID().toString(); + // TODO ZK: handle max size. - // TODO ZK: handle retries. - String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + locNode.id() + "|", + + String path = zkClient.createSequential(prefix, + zkPaths.joinDataDir, + prefix + ":" + locNode.id() + "|", joinDataBytes, EPHEMERAL_SEQUENTIAL); int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') + 1)); - locNodeZkPath = zkClient.createIfNeeded(zkPaths.aliveNodesDir + "/" + locNode.id() + "|" + seqNum + "|", + locNodeZkPath = zkClient.createSequential( + prefix, + zkPaths.aliveNodesDir, + prefix + ":" + locNode.id() + "|" + seqNum + "|", null, EPHEMERAL_SEQUENTIAL); + log.info("Node started join [nodeId=" + locNode.id() + + ", instanceName=" + locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + + ", nodePath=" + locNodeZkPath + ']'); + zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { onConnected(rc, children); @@ -439,38 +454,13 @@ public class ZookeeperDiscoveryImpl { assert prevE != null; - final int crdInternalId = crdE.getKey(); - final int locInternalId0 = locInternalId; - log.info("Discovery coordinator already exists, watch for previous node [" + "locId=" + locNode.id() + ", prevPath=" + prevE.getValue() + ']'); - zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), new Watcher() { - @Override public void process(WatchedEvent evt) { - if (evt.getType() == Event.EventType.NodeDeleted) { - try { - onPreviousNodeFail(aliveNodes, crdInternalId, locInternalId0); - } - catch (Throwable e) { - onFatalError(e); - } - } - } - }, new AsyncCallback.StatCallback() { - @Override public void processResult(int rc, String path, Object ctx, Stat stat) { - assert rc == 0 : rc; - - if (stat == null) { - try { - onPreviousNodeFail(aliveNodes, crdInternalId, locInternalId0); - } - catch (Throwable e) { - onFatalError(e); - } - } - } - }); + PreviousNodeWatcher watcher = new PreviousNodeWatcher(); + + zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + prevE.getValue(), watcher, watcher); } } catch (Throwable e) { @@ -478,7 +468,44 @@ public class ZookeeperDiscoveryImpl { } } - private void onPreviousNodeFail(List<String> aliveNodes, int crdInternalId, int locInternalId) throws Exception { + /** + * + */ + private class PreviousNodeWatcher implements Watcher, AsyncCallback.StatCallback { + @Override public void process(WatchedEvent evt) { + if (evt.getType() == Event.EventType.NodeDeleted) { + try { + onPreviousNodeFail(); + } + catch (Throwable e) { + onFatalError(e); + } + } + else { + if (log.isInfoEnabled()) + log.info("Previous node watch event: " + evt); + + zkClient.existsAsync(evt.getPath(), this, this); + } + } + + @Override public void processResult(int rc, String path, Object ctx, Stat stat) { + log.info("Previous node stat callback [rc=" + rc + ", path=" + path + ", stat=" + stat + ']'); + + assert rc == 0 : rc; + + if (stat == null) { + try { + onPreviousNodeFail(); + } + catch (Throwable e) { + onFatalError(e); + } + } + } + } + + private void onPreviousNodeFail() throws Exception { // TODO ZK: // if (locInternalId == crdInternalId + 1) { // if (log.isInfoEnabled()) @@ -517,7 +544,7 @@ public class ZookeeperDiscoveryImpl { assert this.evtsData != null; for (ZkDiscoveryEventData evtData : evtsData.evts.values()) - evtData.remainingAcks(top.nodesByOrder.values()); + evtData.initRemainingAcks(top.nodesByOrder.values()); handleProcessedEvents(); } @@ -643,7 +670,7 @@ public class ZookeeperDiscoveryImpl { if (!alives.containsKey(e.getKey())) { ZookeeperClusterNode failedNode = e.getValue(); - processEventAcksOnNodeFail(failedNode); + handleProcessedEventsOnNodeFail(failedNode); generateNodeFail(curTop, failedNode); @@ -690,10 +717,8 @@ public class ZookeeperDiscoveryImpl { evtsData.addEvent(curTop.values(), evtData); - if (log.isInfoEnabled()) { - log.info("Generated NODE_FAILED event [topVer=" + evtData.topologyVersion() + - ", nodeId=" + failedNode.id() + ']'); - } + if (log.isInfoEnabled()) + log.info("Generated NODE_FAILED event [evt=" + evtData + ']'); } /** @@ -708,9 +733,13 @@ public class ZookeeperDiscoveryImpl { throws Exception { UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath); - int joinSeq = ZkIgnitePaths.aliveJoinSequence(aliveNodePath); + int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath); - String joinDataPath = zkPaths.joinDataDir + '/' + nodeId.toString() + "|" + String.format("%010d", joinSeq); + String joinDataPath = zkPaths.joinDataDir + '/' + + ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" + + nodeId.toString() + + "|" + + String.format("%010d", joinSeq); byte[] joinData; @@ -766,6 +795,8 @@ public class ZookeeperDiscoveryImpl { evtsData.addEvent(dataForJoined.topology(), evtData); + evtData.addRemainingAck(joinedNode); // Topology for joined node does not contain joined node. + long start = System.currentTimeMillis(); zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), joinData, PERSISTENT); @@ -773,11 +804,8 @@ public class ZookeeperDiscoveryImpl { long time = System.currentTimeMillis() - start; - if (log.isInfoEnabled()) { - log.info("Generated NODE_JOINED event [topVer=" + evtData.topologyVersion() + - ", nodeId=" + joinedNode.id() + - ", addDataTime=" + time + ']'); - } + if (log.isInfoEnabled()) + log.info("Generated NODE_JOINED event [evt=" + evtData + ", addDataTime=" + time + ']'); } /** @@ -905,7 +933,7 @@ public class ZookeeperDiscoveryImpl { evtsData.addEvent(top.nodesByOrder.values(), evtData); if (log.isInfoEnabled()) - log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion() + ", evt=" + msg + ']'); + log.info("Generated CUSTOM event [evt=" + evtData + ", msg=" + msg + ']'); } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal custom discovery message: " + e, e); @@ -945,9 +973,6 @@ public class ZookeeperDiscoveryImpl { this.evtsData = newEvtsData; } - /** */ - private int procEvtCnt; - /** * @param evtsData Events. * @throws Exception If failed. @@ -1067,83 +1092,6 @@ public class ZookeeperDiscoveryImpl { } /** - * @throws Exception If failed. - */ - private void handleProcessedEvents() throws Exception { - Iterator<ZkDiscoveryEventData> it = this.evtsData.evts.values().iterator(); - - List<ZkDiscoveryCustomEventData> newEvts = null; - - while (it.hasNext()) { - ZkDiscoveryEventData evtData = it.next(); - - if (evtData.allAcksReceived()) { - switch (evtData.eventType()) { - case EventType.EVT_NODE_JOINED: { - processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData); - - break; - } - - case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { - DiscoverySpiCustomMessage ack = handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData); - - if (ack != null) { - evtsData.evtIdGen++; - - long evtId = evtsData.evtIdGen; - - byte[] ackBytes = marshal(ack); - - String evtChildPath = String.valueOf(evtId); - - zkClient.createIfNeeded( - zkPaths.customEventDataPath(true, evtChildPath), - ackBytes, - CreateMode.PERSISTENT); - - ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData( - evtId, - evtData.topologyVersion(), // Use topology version from original event. - locNode.id(), - evtChildPath, - true); - - ackEvtData.msg = ack; - - if (newEvts == null) - newEvts = new ArrayList<>(); - - newEvts.add(ackEvtData); - - if (log.isInfoEnabled()) - log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion() + ", evt=" + ack + ']'); - } - - break; - } - - case EventType.EVT_NODE_FAILED: { - log.info("All nodes processed node fail [evtId=" + evtData.eventId() + ']'); - - // Do not need cleanup. - break; - } - } - - it.remove(); - } - } - - if (newEvts != null) { - for (int i = 0; i < newEvts.size(); i++) - evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i)); - - saveAndProcessNewEvents(); - } - } - - /** * @param evtsData Events data. * @param evtData Local join event data. * @throws Exception If failed. @@ -1260,10 +1208,87 @@ public class ZookeeperDiscoveryImpl { } /** + * @throws Exception If failed. + */ + private void handleProcessedEvents() throws Exception { + Iterator<ZkDiscoveryEventData> it = this.evtsData.evts.values().iterator(); + + List<ZkDiscoveryCustomEventData> newEvts = null; + + while (it.hasNext()) { + ZkDiscoveryEventData evtData = it.next(); + + if (evtData.allAcksReceived()) { + switch (evtData.eventType()) { + case EventType.EVT_NODE_JOINED: { + handleProcessedJoinEvent((ZkDiscoveryNodeJoinEventData)evtData); + + break; + } + + case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { + DiscoverySpiCustomMessage ack = handleProcessedCustomEvent((ZkDiscoveryCustomEventData)evtData); + + if (ack != null) { + evtsData.evtIdGen++; + + long evtId = evtsData.evtIdGen; + + byte[] ackBytes = marshal(ack); + + String evtChildPath = String.valueOf(evtId); + + zkClient.createIfNeeded( + zkPaths.customEventDataPath(true, evtChildPath), + ackBytes, + CreateMode.PERSISTENT); + + ZkDiscoveryCustomEventData ackEvtData = new ZkDiscoveryCustomEventData( + evtId, + evtData.topologyVersion(), // Use topology version from original event. + locNode.id(), + evtChildPath, + true); + + ackEvtData.msg = ack; + + if (newEvts == null) + newEvts = new ArrayList<>(); + + newEvts.add(ackEvtData); + + if (log.isInfoEnabled()) + log.info("Generated CUSTOM event ack [evt=" + evtData + ", msg=" + ack + ']'); + } + + break; + } + + case EventType.EVT_NODE_FAILED: { + if (log.isInfoEnabled()) + log.info("All nodes processed node fail [evtData=" + evtData + ']'); + + break; // Do not need addition cleanup. + } + } + + it.remove(); + } + } + + if (newEvts != null) { + for (int i = 0; i < newEvts.size(); i++) + evtsData.addEvent(top.nodesByOrder.values(), newEvts.get(i)); + + saveAndProcessNewEvents(); + } + } + + /** * @param failedNode Failed node. * @throws Exception If failed. */ - private void processEventAcksOnNodeFail(ZookeeperClusterNode failedNode) throws Exception { + private void handleProcessedEventsOnNodeFail(ZookeeperClusterNode failedNode) throws Exception { boolean processed = false; for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = evtsData.evts.entrySet().iterator(); it.hasNext();) { @@ -1283,8 +1308,8 @@ public class ZookeeperDiscoveryImpl { * @param evtData Event data. * @throws Exception If failed. */ - private void processNodesAckJoinEvent(ZkDiscoveryNodeJoinEventData evtData) throws Exception { - log.info("All nodes processed node join [evtId=" + evtData.eventId() + ']'); + private void handleProcessedJoinEvent(ZkDiscoveryNodeJoinEventData evtData) throws Exception { + log.info("All nodes processed node join [evtData=" + evtData + ']'); zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), -1); zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), -1); @@ -1298,7 +1323,7 @@ public class ZookeeperDiscoveryImpl { @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(ZkDiscoveryCustomEventData evtData) throws Exception { - log.info("All nodes processed custom event [evtId=" + evtData.eventId() + ']'); + log.info("All nodes processed custom event [evtData=" + evtData + ']'); if (!evtData.ackEvent()) { zkClient.deleteIfExists(zkPaths.customEventDataPath(false, evtData.evtPath), -1); http://git-wip-us.apache.org/repos/asf/ignite/blob/9ffd603d/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 8b3a117..39f9fbf 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -26,6 +27,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.curator.test.TestingCluster; @@ -183,14 +185,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { zkCluster = new TestingCluster(3); zkCluster.start(); } - - System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); } /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { - System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD); - if (zkCluster != null) { try { zkCluster.close(); @@ -205,6 +203,20 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { super.afterTestsStopped(); } + /** + * + */ + private static void ackEveryEventSystemProperty() { + System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); + } + + /** + * + */ + private void clearAckEveryEventSystemProperty() { + System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); + } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -226,6 +238,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { stopAllGrids(); } + + clearAckEveryEventSystemProperty(); } /** @@ -301,6 +315,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCustomEventsSimple1_SingleNode() throws Exception { + ackEveryEventSystemProperty(); + Ignite srv0 = startGrid(0); srv0.createCache(new CacheConfiguration<>("c1")); @@ -312,6 +328,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCustomEventsSimple1_5_Nodes() throws Exception { + ackEveryEventSystemProperty(); + Ignite srv0 = startGrids(5); srv0.createCache(new CacheConfiguration<>("c1")); @@ -662,7 +680,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { startGrids(2); for (Ignite node : G.allGrids()) { - IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); + IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME); assertNotNull(cache); @@ -680,6 +698,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStop_2_Nodes() throws Exception { + ackEveryEventSystemProperty(); + startGrid(0); waitForTopology(1); @@ -700,6 +720,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStop1() throws Exception { + ackEveryEventSystemProperty(); + startGridsMultiThreaded(5, false); waitForTopology(5); @@ -725,24 +747,46 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * @param node Node. * @throws Exception If failed. */ - private void waitForEventsAcks(final Ignite node) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(), - "impl", "evtsData", "evts"); + public void testStartStop3() throws Exception { + startGrids(4); - if (!evts.isEmpty()) { - info("Unacked events: " + evts); + awaitPartitionMapExchange(); - return false; - } + stopGrid(0); - return true; - } - }, 10_000)); + startGrid(5); + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop4() throws Exception { + startGrids(6); + + awaitPartitionMapExchange(); + + stopGrid(2); + + if (ThreadLocalRandom.current().nextBoolean()) + awaitPartitionMapExchange(); + + stopGrid(1); + + if (ThreadLocalRandom.current().nextBoolean()) + awaitPartitionMapExchange(); + + stopGrid(0); + + if (ThreadLocalRandom.current().nextBoolean()) + awaitPartitionMapExchange(); + + startGrid(7); + + awaitPartitionMapExchange(); } /** @@ -796,6 +840,106 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testTopologyChangeAndZkRestart() throws Exception { + + } + + /** + * @param restartZk + * @throws Exception If failed. + */ + private void topologyChangeWithRestarts(boolean restartZk) throws Exception { + startGrid(0); + + long stopTime = System.currentTimeMillis(); + } + + /** + * @throws Exception If failed. + */ + public void testRandomTopologyChanges() throws Exception { + List<Integer> startedNodes = new ArrayList<>(); + List<String> startedCaches = new ArrayList<>(); + + int nextNodeIdx = 0; + int nextCacheIdx = 0; + + long stopTime = System.currentTimeMillis() + 60_000; + + int MAX_NODES = 20; + int MAX_CACHES = 10; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (System.currentTimeMillis() < stopTime) { + if (startedNodes.size() > 0 && rnd.nextInt(10) == 0) { + boolean startCache = startedCaches.size() < 2 || + (startedCaches.size() < MAX_CACHES && rnd.nextInt(5) != 0); + + int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size())); + + if (startCache) { + String cacheName = "cache-" + nextCacheIdx++; + + log.info("Next, start new cache [cacheName=" + cacheName + + ", node=" + nodeIdx + + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + + ", curCaches=" + startedCaches.size() + ']'); + + ignite(nodeIdx).createCache(new CacheConfiguration<>(cacheName)); + + startedCaches.add(cacheName); + } + else { + if (startedCaches.size() > 1) { + String cacheName = startedCaches.get(rnd.nextInt(startedCaches.size())); + + log.info("Next, stop cache [nodeIdx=" + nodeIdx + + ", node=" + nodeIdx + + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + + ", cacheName=" + startedCaches.size() + ']'); + + ignite(nodeIdx).destroyCache(cacheName); + + assertTrue(startedCaches.remove(cacheName)); + } + } + } + else { + boolean startNode = startedNodes.size() < 2 || + (startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 0); + + if (startNode) { + int nodeIdx = nextNodeIdx++; + + log.info("Next, start new node [nodeIdx=" + nodeIdx + + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + + ", curNodes=" + startedNodes.size() + ']'); + + startGrid(nodeIdx); + + assertTrue(startedNodes.add(nodeIdx)); + } + else { + if (startedNodes.size() > 1) { + int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size())); + + log.info("Next, stop [nodeIdx=" + nodeIdx + + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + + ", curNodes=" + startedNodes.size() + ']'); + + stopGrid(nodeIdx); + + assertTrue(startedNodes.remove((Integer)nodeIdx)); + } + } + } + } + } + + /** * */ private void reset() { @@ -811,6 +955,27 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @param node Node. + * @throws Exception If failed. + */ + private void waitForEventsAcks(final Ignite node) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(), + "impl", "evtsData", "evts"); + + if (!evts.isEmpty()) { + info("Unacked events: " + evts); + + return false; + } + + return true; + } + }, 10_000)); + } + + /** * @throws Exception If failed. */ private void checkEventsConsistency() throws Exception {