Repository: ignite Updated Branches: refs/heads/ignite-zk a4be5afd0 -> 97e851794
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97e85179 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97e85179 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97e85179 Branch: refs/heads/ignite-zk Commit: 97e85179418bc369066c26ec086edd138419c406 Parents: a4be5af Author: sboikov <sboi...@gridgain.com> Authored: Mon Nov 20 17:21:33 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Nov 20 17:46:24 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 2 +- .../zk/internal/ZkDiscoveryCustomEventData.java | 53 +++++ .../zk/internal/ZkDiscoveryEventData.java | 14 +- .../zk/internal/ZkDiscoveryEventsData.java | 8 +- .../internal/ZkDiscoveryNodeFailEventData.java | 12 +- .../internal/ZkDiscoveryNodeJoinEventData.java | 5 +- .../spi/discovery/zk/internal/ZkPaths.java | 20 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 210 ++++++++++++++++--- .../zk/ZookeeperDiscoverySpiBasicTest.java | 18 ++ 9 files changed, 300 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 75f4f36..ee0209b 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -207,7 +207,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) { - impl.sendCustomEvent(msg); + impl.sendCustomMessage(msg); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java new file mode 100644 index 0000000..cecb2dc --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.UUID; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; + +/** + * + */ +class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { + /** */ + final UUID sndNodeId; + + /** */ + final String evtPath; + + /** */ + transient DiscoverySpiCustomMessage msg; + + /** + * @param evtId Event ID. + * @param topVer Topology version. + * @param evtPath Event path. + */ + ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath) { + super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer); + + this.sndNodeId = sndNodeId; + this.evtPath = evtPath; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CustomEventData [topVer=" + topologyVersion() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java index fb05d14..3982c90 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java @@ -18,8 +18,6 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.io.Serializable; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -33,24 +31,30 @@ abstract class ZkDiscoveryEventData implements Serializable { private static final long serialVersionUID = 0L; /** */ - @GridToStringInclude + private final long evtId; + + /** */ private final int evtType; /** */ - @GridToStringInclude private final long topVer; /** * @param evtType Event type. * @param topVer Topology version. */ - ZkDiscoveryEventData(int evtType, long topVer) { + ZkDiscoveryEventData(long evtId, int evtType, long topVer) { assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_FAILED || evtType == EVT_DISCOVERY_CUSTOM_EVT : evtType; + this.evtId = evtId; this.evtType = evtType; this.topVer = topVer; } + long eventId() { + return evtId; + } + int eventType() { return evtType; } http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java index 92c4f24..d3f07ae 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java @@ -28,6 +28,12 @@ class ZkDiscoveryEventsData implements Serializable { private static final long serialVersionUID = 0L; /** */ + int procCustEvt = -1; + + /** */ + long evtIdGen; + + /** */ long topVer; /** */ @@ -51,7 +57,7 @@ class ZkDiscoveryEventsData implements Serializable { * @param evt Event. */ void addEvent(ZkDiscoveryEventData evt) { - Object old = evts.put(evt.topologyVersion(), evt); + Object old = evts.put(evt.eventId(), evt); assert old == null : old; } http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java index d7664d6..227bb94 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java @@ -26,12 +26,20 @@ class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData { /** */ private int failedNodeInternalId; - ZkDiscoveryNodeFailEventData(long topVer, int failedNodeInternalId) { - super(EventType.EVT_NODE_FAILED, topVer); + /** + * @param evtId Event ID. + * @param topVer Topology version. + * @param failedNodeInternalId Failed node ID. + */ + ZkDiscoveryNodeFailEventData(long evtId, long topVer, int failedNodeInternalId) { + super(evtId, EventType.EVT_NODE_FAILED, topVer); this.failedNodeInternalId = failedNodeInternalId; } + /** + * @return Failed node ID. + */ int failedNodeInternalId() { return failedNodeInternalId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index 36e37a2..5a828dc 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -34,12 +34,13 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { transient ZkJoiningNodeData joiningNodeData; /** + * @param evtId Event ID. * @param topVer Topology version. * @param nodeId Joined node ID. * @param joinedInternalId Joined node internal ID. */ - ZkDiscoveryNodeJoinEventData(long topVer, UUID nodeId, int joinedInternalId) { - super(EventType.EVT_NODE_JOINED, topVer); + ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int joinedInternalId) { + super(evtId, EventType.EVT_NODE_JOINED, topVer); this.nodeId = nodeId; this.joinedInternalId = joinedInternalId; http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java index 394ba59..643e10d 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java @@ -30,6 +30,9 @@ class ZkPaths { private static final String JOIN_DATA_DIR = "joinData"; /** */ + private static final String CUSTOM_EVTS_DIR = "customEvts"; + + /** */ private static final String ALIVE_NODES_DIR = "alive"; /** */ @@ -53,6 +56,9 @@ class ZkPaths { /** */ final String evtsPath; + /** */ + final String customEvtsDir; + /** * @param basePath Base directory. * @param clusterName Cluster name. @@ -62,9 +68,11 @@ class ZkPaths { this.clusterName = clusterName; clusterDir = basePath + "/" + clusterName; + aliveNodesDir = zkPath(ALIVE_NODES_DIR); joinDataDir = zkPath(JOIN_DATA_DIR); evtsPath = zkPath(DISCO_EVENTS_PATH); + customEvtsDir = zkPath(CUSTOM_EVTS_DIR); } /** @@ -93,4 +101,16 @@ class ZkPaths { return Integer.parseInt(path.substring(idx1 + 1, idx2)); } + + static int customEventSequence(String path) { + int idx = path.lastIndexOf('|'); + + return Integer.parseInt(path.substring(idx + 1)); + } + + static UUID customEventSendNodeId(String path) { + String idStr = path.substring(0, ZkPaths.UUID_LEN); + + return UUID.fromString(idStr); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 9689762..f351b35 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -35,6 +35,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -46,6 +47,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -104,6 +106,12 @@ public class ZookeeperDiscoveryImpl { /** */ private boolean joined; + /** */ + private ZkDiscoveryEventsData evts; + + /** */ + private boolean crd; + /** * @param log * @param basePath @@ -186,10 +194,37 @@ public class ZookeeperDiscoveryImpl { } } - public void sendCustomEvent(DiscoverySpiCustomMessage msg) { - // TODO ZK + /** + * @param msg Message. + */ + public void sendCustomMessage(DiscoverySpiCustomMessage msg) { + assert msg != null; + + byte[] msgBytes; + + try { + msgBytes = marshal(msg); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException("Failed to marshal custom message: " + msg, e); + } + + try { + zkClient.createIfNeeded(zkPaths.customEvtsDir + "/" + locNode.id() + '|', msgBytes, CreateMode.PERSISTENT_SEQUENTIAL); + } + catch (ZookeeperClientFailedException e) { + throw new IgniteException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } } + /** + * @return Cluster start time. + */ public long gridStartTime() { return gridStartTime; } @@ -265,6 +300,8 @@ public class ZookeeperDiscoveryImpl { zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT); + zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT); + zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT); } catch (ZookeeperClientFailedException e) { @@ -272,9 +309,6 @@ public class ZookeeperDiscoveryImpl { } } - /** */ - private ZkDiscoveryEventsData crdEvts; - /** * @throws InterruptedException If interrupted. */ @@ -314,12 +348,6 @@ public class ZookeeperDiscoveryImpl { connStartLatch.await(); } - /** */ - private ZkDiscoveryEventsData evts; - - /** */ - private boolean crd; - /** * @param rc Async callback result. * @param aliveNodes Alive nodes. @@ -423,11 +451,15 @@ public class ZookeeperDiscoveryImpl { } } + /** + * @param locInternalId Local node's internal ID. + * @throws Exception If failed. + */ private void onBecomeCoordinator(int locInternalId) throws Exception { byte[] evtsData = zkClient.getData(zkPaths.evtsPath); if (evtsData.length > 0) - onEventsUpdate(evtsData, null); + onEventsUpdate(evtsData); crd = true; @@ -446,8 +478,13 @@ public class ZookeeperDiscoveryImpl { } zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, childrenCallback); + zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, childrenCallback); } + /** + * @param aliveNodes ZK nodes representing alive cluster nodes. + * @throws Exception If failed. + */ private void generateTopologyEvents(List<String> aliveNodes) throws Exception { assert crd; @@ -502,8 +539,11 @@ public class ZookeeperDiscoveryImpl { assert rmvd != null; evts.topVer++; + evts.evtIdGen++; - ZkDiscoveryEventData evtData = new ZkDiscoveryNodeFailEventData(evts.topVer, failedNode.internalId()); + ZkDiscoveryEventData evtData = new ZkDiscoveryNodeFailEventData(evts.evtIdGen, + evts.topVer, + failedNode.internalId()); evts.addEvent(evtData); @@ -542,6 +582,7 @@ public class ZookeeperDiscoveryImpl { assert nodeId.equals(joinedNode.id()) : joiningNodeData.node(); evts.topVer++; + evts.evtIdGen++; joinedNode.order(evts.topVer); joinedNode.internalId(internalId); @@ -566,7 +607,9 @@ public class ZookeeperDiscoveryImpl { assert old == null; - ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData(evts.topVer, + ZkDiscoveryNodeJoinEventData evtData = new ZkDiscoveryNodeJoinEventData( + evts.evtIdGen, + evts.topVer, joinedNode.id(), joinedNode.internalId()); @@ -574,7 +617,7 @@ public class ZookeeperDiscoveryImpl { evts.addEvent(evtData); - String evtDataPath = zkPaths.evtsPath + "/" + evtData.topologyVersion(); + String evtDataPath = zkPaths.evtsPath + "/" + evtData.eventId(); long start = System.currentTimeMillis(); @@ -630,6 +673,9 @@ public class ZookeeperDiscoveryImpl { zkClient.delete(evtDir, -1); } + + for (String evtPath : zkClient.getChildren(zkPaths.customEvtsDir)) + zkClient.delete(zkPaths.customEvtsDir + "/" + evtPath, -1); } private void removeChildren(String path) throws Exception { @@ -638,14 +684,69 @@ public class ZookeeperDiscoveryImpl { } /** - * @param children - * @param stat + * @param customEvtNodes ZK nodes representing custom events to process. + * @throws Exception If failed. */ - private void onAliveNodesUpdate(List<String> children, Stat stat) throws Exception { - generateTopologyEvents(children); + private void generateCustomEvents(List<String> customEvtNodes) throws Exception { + assert crd; + + TreeMap<Integer, String> newEvts = null; + + for (int i = 0; i < customEvtNodes.size(); i++) { + String evtPath = customEvtNodes.get(i); + + int evtSeq = ZkPaths.customEventSequence(evtPath); + + if (evtSeq > evts.procCustEvt) { + if (newEvts == null) + newEvts = new TreeMap<>(); + + newEvts.put(evtSeq, evtPath); + } + } + + if (newEvts != null) { + for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) { + UUID sndNodeId = ZkPaths.customEventSendNodeId(evtE.getValue()); + + byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue()); + + DiscoverySpiCustomMessage msg; + + try { + msg = unmarshal(evtBytes); + + evts.evtIdGen++; + + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( + evts.evtIdGen, + evts.topVer, + sndNodeId, + evtE.getValue()); + + evtData.msg = msg; + + evts.addEvent(evtData); + + if (log.isInfoEnabled()) + log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion() + ", evt=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal custom discovery message: " + e, e); + } + + evts.procCustEvt = evtE.getKey(); + } + + onEventsUpdate(evts); + } } - private void onEventsUpdate(byte[] data, Stat stat) throws Exception { + /** + * @param data Marshalled events. + * @throws Exception If failed. + */ + private void onEventsUpdate(byte[] data) throws Exception { if (data.length == 0) return; @@ -660,6 +761,7 @@ public class ZookeeperDiscoveryImpl { /** * @param evtsData Events. + * @throws Exception If failed. */ private void onEventsUpdate(ZkDiscoveryEventsData evtsData) throws Exception { TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts; @@ -682,7 +784,7 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Local join event data: " + evtData + ']'); - String path = zkPaths.evtsPath + "/" + evtData.topologyVersion() + "/joined"; + String path = zkPaths.evtsPath + "/" + evtData.eventId() + "/joined"; ZkJoinEventDataForJoined dataForJoined = unmarshal(zkClient.getData(path)); @@ -728,8 +830,13 @@ public class ZookeeperDiscoveryImpl { ZkJoiningNodeData joiningData; - if (!crd) { - String path = zkPaths.evtsPath + "/" + evtData.topologyVersion(); + if (crd) { + assert evtData0.joiningNodeData != null; + + joiningData = evtData0.joiningNodeData; + } + else { + String path = zkPaths.evtsPath + "/" + evtData.eventId(); joiningData = unmarshal(zkClient.getData(path)); @@ -739,11 +846,6 @@ public class ZookeeperDiscoveryImpl { exchange.onExchange(dataBag); } - else { - assert evtData0.joiningNodeData != null; - - joiningData = evtData0.joiningNodeData; - } notifyNodeJoin(evtData0, joiningData); @@ -756,6 +858,27 @@ public class ZookeeperDiscoveryImpl { break; } + case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { + ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; + + DiscoverySpiCustomMessage msg; + + if (crd) { + assert evtData0.msg != null : evtData0; + + msg = evtData0.msg; + } + else { + String path = zkPaths.customEvtsDir + "/" + evtData0.evtPath; + + msg = unmarshal(zkClient.getData(path)); + } + + notifyCustomEvent(evtData0, msg); + + break; + } + default: assert false : "Invalid event: " + evtData; } @@ -768,6 +891,26 @@ public class ZookeeperDiscoveryImpl { /** * @param evtData Event data. + * @param msg Custom message. + */ + @SuppressWarnings("unchecked") + private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, DiscoverySpiCustomMessage msg) { + ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId); + + assert sndNode != null : evtData; + + List<ClusterNode> topSnapshot = new ArrayList<>((Collection)top.nodesByOrder.values()); + + lsnr.onDiscovery(evtData.eventType(), + evtData.topologyVersion(), + sndNode, + topSnapshot, + Collections.<Long, Collection<ClusterNode>>emptyMap(), + msg); + } + + /** + * @param evtData Event data. * @param joiningData Joining node data. */ @SuppressWarnings("unchecked") @@ -792,6 +935,7 @@ public class ZookeeperDiscoveryImpl { /** * @param evtData Event data. */ + @SuppressWarnings("unchecked") private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) { ZookeeperClusterNode failedNode = top.removeNode(evtData.failedNodeInternalId()); @@ -821,7 +965,7 @@ public class ZookeeperDiscoveryImpl { * @param e Error. */ private void onFatalError(Throwable e) { - // TODO ZL + // TODO ZK U.error(log, "Failed to process discovery data. Stopping the node in order to prevent cluster wide instability.", e); joinFut.onDone(e); @@ -893,6 +1037,8 @@ public class ZookeeperDiscoveryImpl { else if (evt.getType() == Event.EventType.NodeChildrenChanged) { if (evt.getPath().equals(zkPaths.aliveNodesDir)) zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); + else if (evt.getPath().equals(zkPaths.customEvtsDir)) + zkClient.getChildrenAsync(evt.getPath(), this, childrenCallback); else U.warn(log, "Received NodeChildrenChanged for unexpected path: " + evt.getPath()); } @@ -909,7 +1055,9 @@ public class ZookeeperDiscoveryImpl { assert rc == 0 : rc; if (path.equals(zkPaths.aliveNodesDir)) - onAliveNodesUpdate(children, stat); + generateTopologyEvents(children); + else if (path.equals(zkPaths.customEvtsDir)) + generateCustomEvents(children); else U.warn(log, "Children callback for unexpected path: " + path); } @@ -930,7 +1078,7 @@ public class ZookeeperDiscoveryImpl { if (path.equals(zkPaths.evtsPath)) { if (!crd) - onEventsUpdate(data, stat); + onEventsUpdate(data); } else U.warn(log, "Data callback for unknown path: " + path); http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 0b9c2e4..162cf76 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -230,6 +230,24 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCustomEventsSimple1_SingleNode() throws Exception { + Ignite srv0 = startGrid(0); + + srv0.createCache(new CacheConfiguration<>("c1")); + } + + /** + * @throws Exception If failed. + */ + public void testCustomEventsSimple1_5_Nodes() throws Exception { + Ignite srv0 = startGrids(2); + + srv0.createCache(new CacheConfiguration<>("c1")); + } + + /** + * @throws Exception If failed. + */ public void testSegmentation1() throws Exception { sesTimeout = 2000; testSockNio = true;