Repository: ignite
Updated Branches:
  refs/heads/ignite-zk a4be5afd0 -> 97e851794


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/97e85179
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/97e85179
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/97e85179

Branch: refs/heads/ignite-zk
Commit: 97e85179418bc369066c26ec086edd138419c406
Parents: a4be5af
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Nov 20 17:21:33 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Nov 20 17:46:24 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   2 +-
 .../zk/internal/ZkDiscoveryCustomEventData.java |  53 +++++
 .../zk/internal/ZkDiscoveryEventData.java       |  14 +-
 .../zk/internal/ZkDiscoveryEventsData.java      |   8 +-
 .../internal/ZkDiscoveryNodeFailEventData.java  |  12 +-
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   5 +-
 .../spi/discovery/zk/internal/ZkPaths.java      |  20 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 210 ++++++++++++++++---
 .../zk/ZookeeperDiscoverySpiBasicTest.java      |  18 ++
 9 files changed, 300 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 75f4f36..ee0209b 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -207,7 +207,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
 
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
-        impl.sendCustomEvent(msg);
+        impl.sendCustomMessage(msg);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
new file mode 100644
index 0000000..cecb2dc
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+
+/**
+ *
+ */
+class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
+    /** */
+    final UUID sndNodeId;
+
+    /** */
+    final String evtPath;
+
+    /** */
+    transient DiscoverySpiCustomMessage msg;
+
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param evtPath Event path.
+     */
+    ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String 
evtPath) {
+        super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
+
+        this.sndNodeId = sndNodeId;
+        this.evtPath = evtPath;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "CustomEventData [topVer=" + topologyVersion() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index fb05d14..3982c90 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.Serializable;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -33,24 +31,30 @@ abstract class ZkDiscoveryEventData implements Serializable 
{
     private static final long serialVersionUID = 0L;
 
     /** */
-    @GridToStringInclude
+    private final long evtId;
+
+    /** */
     private final int evtType;
 
     /** */
-    @GridToStringInclude
     private final long topVer;
 
     /**
      * @param evtType Event type.
      * @param topVer Topology version.
      */
-    ZkDiscoveryEventData(int evtType, long topVer) {
+    ZkDiscoveryEventData(long evtId, int evtType, long topVer) {
         assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_FAILED || 
evtType == EVT_DISCOVERY_CUSTOM_EVT : evtType;
 
+        this.evtId = evtId;
         this.evtType = evtType;
         this.topVer = topVer;
     }
 
+    long eventId() {
+        return evtId;
+    }
+
     int eventType() {
         return evtType;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index 92c4f24..d3f07ae 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -28,6 +28,12 @@ class ZkDiscoveryEventsData implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    int procCustEvt = -1;
+
+    /** */
+    long evtIdGen;
+
+    /** */
     long topVer;
 
     /** */
@@ -51,7 +57,7 @@ class ZkDiscoveryEventsData implements Serializable {
      * @param evt Event.
      */
     void addEvent(ZkDiscoveryEventData evt) {
-        Object old = evts.put(evt.topologyVersion(), evt);
+        Object old = evts.put(evt.eventId(), evt);
 
         assert old == null : old;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
index d7664d6..227bb94 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
@@ -26,12 +26,20 @@ class ZkDiscoveryNodeFailEventData extends 
ZkDiscoveryEventData {
     /** */
     private int failedNodeInternalId;
 
-    ZkDiscoveryNodeFailEventData(long topVer, int failedNodeInternalId) {
-        super(EventType.EVT_NODE_FAILED, topVer);
+    /**
+     * @param evtId Event ID.
+     * @param topVer Topology version.
+     * @param failedNodeInternalId Failed node ID.
+     */
+    ZkDiscoveryNodeFailEventData(long evtId, long topVer, int 
failedNodeInternalId) {
+        super(evtId, EventType.EVT_NODE_FAILED, topVer);
 
         this.failedNodeInternalId = failedNodeInternalId;
     }
 
+    /**
+     * @return Failed node ID.
+     */
     int failedNodeInternalId() {
         return failedNodeInternalId;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
index 36e37a2..5a828dc 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -34,12 +34,13 @@ class ZkDiscoveryNodeJoinEventData extends 
ZkDiscoveryEventData {
     transient ZkJoiningNodeData joiningNodeData;
 
     /**
+     * @param evtId Event ID.
      * @param topVer Topology version.
      * @param nodeId Joined node ID.
      * @param joinedInternalId Joined node internal ID.
      */
-    ZkDiscoveryNodeJoinEventData(long topVer, UUID nodeId, int 
joinedInternalId) {
-        super(EventType.EVT_NODE_JOINED, topVer);
+    ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int 
joinedInternalId) {
+        super(evtId, EventType.EVT_NODE_JOINED, topVer);
 
         this.nodeId = nodeId;
         this.joinedInternalId = joinedInternalId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
index 394ba59..643e10d 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
@@ -30,6 +30,9 @@ class ZkPaths {
     private static final String JOIN_DATA_DIR = "joinData";
 
     /** */
+    private static final String CUSTOM_EVTS_DIR = "customEvts";
+
+    /** */
     private static final String ALIVE_NODES_DIR = "alive";
 
     /** */
@@ -53,6 +56,9 @@ class ZkPaths {
     /** */
     final String evtsPath;
 
+    /** */
+    final String customEvtsDir;
+
     /**
      * @param basePath Base directory.
      * @param clusterName Cluster name.
@@ -62,9 +68,11 @@ class ZkPaths {
         this.clusterName = clusterName;
 
         clusterDir = basePath + "/" + clusterName;
+
         aliveNodesDir = zkPath(ALIVE_NODES_DIR);
         joinDataDir = zkPath(JOIN_DATA_DIR);
         evtsPath = zkPath(DISCO_EVENTS_PATH);
+        customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
     }
 
     /**
@@ -93,4 +101,16 @@ class ZkPaths {
 
         return Integer.parseInt(path.substring(idx1 + 1, idx2));
     }
+
+    static int customEventSequence(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    static UUID customEventSendNodeId(String path) {
+        String idStr = path.substring(0, ZkPaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 9689762..f351b35 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -35,6 +35,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -46,6 +47,7 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -104,6 +106,12 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private boolean joined;
 
+    /** */
+    private ZkDiscoveryEventsData evts;
+
+    /** */
+    private boolean crd;
+
     /**
      * @param log
      * @param basePath
@@ -186,10 +194,37 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
-        // TODO ZK
+    /**
+     * @param msg Message.
+     */
+    public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
+        assert msg != null;
+
+        byte[] msgBytes;
+
+        try {
+            msgBytes = marshal(msg);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to marshal custom message: " 
+ msg, e);
+        }
+
+        try {
+            zkClient.createIfNeeded(zkPaths.customEvtsDir + "/" + locNode.id() 
+ '|', msgBytes, CreateMode.PERSISTENT_SEQUENTIAL);
+        }
+        catch (ZookeeperClientFailedException e) {
+            throw new IgniteException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedException(e);
+        }
     }
 
+    /**
+     * @return Cluster start time.
+     */
     public long gridStartTime() {
         return gridStartTime;
     }
@@ -265,6 +300,8 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT);
 
+            zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT);
+
             zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT);
         }
         catch (ZookeeperClientFailedException e) {
@@ -272,9 +309,6 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    /** */
-    private ZkDiscoveryEventsData crdEvts;
-
     /**
      * @throws InterruptedException If interrupted.
      */
@@ -314,12 +348,6 @@ public class ZookeeperDiscoveryImpl {
         connStartLatch.await();
     }
 
-    /** */
-    private ZkDiscoveryEventsData evts;
-
-    /** */
-    private boolean crd;
-
     /**
      * @param rc Async callback result.
      * @param aliveNodes Alive nodes.
@@ -423,11 +451,15 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
+    /**
+     * @param locInternalId Local node's internal ID.
+     * @throws Exception If failed.
+     */
     private void onBecomeCoordinator(int locInternalId) throws Exception {
         byte[] evtsData = zkClient.getData(zkPaths.evtsPath);
 
         if (evtsData.length > 0)
-            onEventsUpdate(evtsData, null);
+            onEventsUpdate(evtsData);
 
         crd = true;
 
@@ -446,8 +478,13 @@ public class ZookeeperDiscoveryImpl {
         }
 
         zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, 
childrenCallback);
+        zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, 
childrenCallback);
     }
 
+    /**
+     * @param aliveNodes ZK nodes representing alive cluster nodes.
+     * @throws Exception If failed.
+     */
     private void generateTopologyEvents(List<String> aliveNodes) throws 
Exception {
         assert crd;
 
@@ -502,8 +539,11 @@ public class ZookeeperDiscoveryImpl {
         assert rmvd != null;
 
         evts.topVer++;
+        evts.evtIdGen++;
 
-        ZkDiscoveryEventData evtData = new 
ZkDiscoveryNodeFailEventData(evts.topVer, failedNode.internalId());
+        ZkDiscoveryEventData evtData = new 
ZkDiscoveryNodeFailEventData(evts.evtIdGen,
+            evts.topVer,
+            failedNode.internalId());
 
         evts.addEvent(evtData);
 
@@ -542,6 +582,7 @@ public class ZookeeperDiscoveryImpl {
         assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
 
         evts.topVer++;
+        evts.evtIdGen++;
 
         joinedNode.order(evts.topVer);
         joinedNode.internalId(internalId);
@@ -566,7 +607,9 @@ public class ZookeeperDiscoveryImpl {
 
         assert old == null;
 
-        ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(evts.topVer,
+        ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(
+            evts.evtIdGen,
+            evts.topVer,
             joinedNode.id(),
             joinedNode.internalId());
 
@@ -574,7 +617,7 @@ public class ZookeeperDiscoveryImpl {
 
         evts.addEvent(evtData);
 
-        String evtDataPath = zkPaths.evtsPath + "/" + 
evtData.topologyVersion();
+        String evtDataPath = zkPaths.evtsPath + "/" + evtData.eventId();
 
         long start = System.currentTimeMillis();
 
@@ -630,6 +673,9 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.delete(evtDir, -1);
         }
+
+        for (String evtPath : zkClient.getChildren(zkPaths.customEvtsDir))
+            zkClient.delete(zkPaths.customEvtsDir + "/" + evtPath, -1);
     }
 
     private void removeChildren(String path) throws Exception {
@@ -638,14 +684,69 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @param children
-     * @param stat
+     * @param customEvtNodes ZK nodes representing custom events to process.
+     * @throws Exception If failed.
      */
-    private void onAliveNodesUpdate(List<String> children, Stat stat) throws 
Exception {
-        generateTopologyEvents(children);
+    private void generateCustomEvents(List<String> customEvtNodes) throws 
Exception {
+        assert crd;
+
+        TreeMap<Integer, String> newEvts = null;
+
+        for (int i = 0; i < customEvtNodes.size(); i++) {
+            String evtPath = customEvtNodes.get(i);
+
+            int evtSeq = ZkPaths.customEventSequence(evtPath);
+
+            if (evtSeq > evts.procCustEvt) {
+                if (newEvts == null)
+                    newEvts = new TreeMap<>();
+
+                newEvts.put(evtSeq, evtPath);
+            }
+        }
+
+        if (newEvts != null) {
+            for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
+                UUID sndNodeId = 
ZkPaths.customEventSendNodeId(evtE.getValue());
+
+                byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + "/" 
+ evtE.getValue());
+
+                DiscoverySpiCustomMessage msg;
+
+                try {
+                    msg = unmarshal(evtBytes);
+
+                    evts.evtIdGen++;
+
+                    ZkDiscoveryCustomEventData evtData = new 
ZkDiscoveryCustomEventData(
+                        evts.evtIdGen,
+                        evts.topVer,
+                        sndNodeId,
+                        evtE.getValue());
+
+                    evtData.msg = msg;
+
+                    evts.addEvent(evtData);
+
+                    if (log.isInfoEnabled())
+                        log.info("Generated CUSTOM event [topVer=" + 
evtData.topologyVersion() + ", evt=" + msg + ']');
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to unmarshal custom discovery 
message: " + e, e);
+                }
+
+                evts.procCustEvt = evtE.getKey();
+            }
+
+            onEventsUpdate(evts);
+        }
     }
 
-    private void onEventsUpdate(byte[] data, Stat stat) throws Exception {
+    /**
+     * @param data Marshalled events.
+     * @throws Exception If failed.
+     */
+    private void onEventsUpdate(byte[] data) throws Exception {
         if (data.length == 0)
             return;
 
@@ -660,6 +761,7 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param evtsData Events.
+     * @throws Exception If failed.
      */
     private void onEventsUpdate(ZkDiscoveryEventsData evtsData) throws 
Exception {
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
@@ -682,7 +784,7 @@ public class ZookeeperDiscoveryImpl {
                     if (log.isInfoEnabled())
                         log.info("Local join event data: " + evtData + ']');
 
-                    String path = zkPaths.evtsPath + "/" + 
evtData.topologyVersion() + "/joined";
+                    String path = zkPaths.evtsPath + "/" + evtData.eventId() + 
"/joined";
 
                     ZkJoinEventDataForJoined dataForJoined = 
unmarshal(zkClient.getData(path));
 
@@ -728,8 +830,13 @@ public class ZookeeperDiscoveryImpl {
 
                         ZkJoiningNodeData joiningData;
 
-                        if (!crd) {
-                            String path = zkPaths.evtsPath + "/" + 
evtData.topologyVersion();
+                        if (crd) {
+                            assert evtData0.joiningNodeData != null;
+
+                            joiningData = evtData0.joiningNodeData;
+                        }
+                        else {
+                            String path = zkPaths.evtsPath + "/" + 
evtData.eventId();
 
                             joiningData = unmarshal(zkClient.getData(path));
 
@@ -739,11 +846,6 @@ public class ZookeeperDiscoveryImpl {
 
                             exchange.onExchange(dataBag);
                         }
-                        else {
-                            assert evtData0.joiningNodeData != null;
-
-                            joiningData = evtData0.joiningNodeData;
-                        }
 
                         notifyNodeJoin(evtData0, joiningData);
 
@@ -756,6 +858,27 @@ public class ZookeeperDiscoveryImpl {
                         break;
                     }
 
+                    case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
+                        ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
+
+                        DiscoverySpiCustomMessage msg;
+
+                        if (crd) {
+                            assert evtData0.msg != null : evtData0;
+
+                            msg = evtData0.msg;
+                        }
+                        else {
+                            String path = zkPaths.customEvtsDir + "/" + 
evtData0.evtPath;
+
+                            msg = unmarshal(zkClient.getData(path));
+                        }
+
+                        notifyCustomEvent(evtData0, msg);
+
+                        break;
+                    }
+
                     default:
                         assert false : "Invalid event: " + evtData;
                 }
@@ -768,6 +891,26 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param evtData Event data.
+     * @param msg Custom message.
+     */
+    @SuppressWarnings("unchecked")
+    private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, 
DiscoverySpiCustomMessage msg) {
+        ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId);
+
+        assert sndNode != null : evtData;
+
+        List<ClusterNode> topSnapshot = new 
ArrayList<>((Collection)top.nodesByOrder.values());
+
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            sndNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            msg);
+    }
+
+    /**
+     * @param evtData Event data.
      * @param joiningData Joining node data.
      */
     @SuppressWarnings("unchecked")
@@ -792,6 +935,7 @@ public class ZookeeperDiscoveryImpl {
     /**
      * @param evtData Event data.
      */
+    @SuppressWarnings("unchecked")
     private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) {
         ZookeeperClusterNode failedNode = 
top.removeNode(evtData.failedNodeInternalId());
 
@@ -821,7 +965,7 @@ public class ZookeeperDiscoveryImpl {
      * @param e Error.
      */
     private void onFatalError(Throwable e) {
-        // TODO ZL
+        // TODO ZK
         U.error(log, "Failed to process discovery data. Stopping the node in 
order to prevent cluster wide instability.", e);
 
         joinFut.onDone(e);
@@ -893,6 +1037,8 @@ public class ZookeeperDiscoveryImpl {
             else if (evt.getType() == Event.EventType.NodeChildrenChanged) {
                 if (evt.getPath().equals(zkPaths.aliveNodesDir))
                     zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
+                else if (evt.getPath().equals(zkPaths.customEvtsDir))
+                    zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
                 else
                     U.warn(log, "Received NodeChildrenChanged for unexpected 
path: " + evt.getPath());
             }
@@ -909,7 +1055,9 @@ public class ZookeeperDiscoveryImpl {
                 assert rc == 0 : rc;
 
                 if (path.equals(zkPaths.aliveNodesDir))
-                    onAliveNodesUpdate(children, stat);
+                    generateTopologyEvents(children);
+                else if (path.equals(zkPaths.customEvtsDir))
+                    generateCustomEvents(children);
                 else
                     U.warn(log, "Children callback for unexpected path: " + 
path);
             }
@@ -930,7 +1078,7 @@ public class ZookeeperDiscoveryImpl {
 
                 if (path.equals(zkPaths.evtsPath)) {
                     if (!crd)
-                        onEventsUpdate(data, stat);
+                        onEventsUpdate(data);
                 }
                 else
                     U.warn(log, "Data callback for unknown path: " + path);

http://git-wip-us.apache.org/repos/asf/ignite/blob/97e85179/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 0b9c2e4..162cf76 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -230,6 +230,24 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCustomEventsSimple1_SingleNode() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventsSimple1_5_Nodes() throws Exception {
+        Ignite srv0 = startGrids(2);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSegmentation1() throws Exception {
         sesTimeout = 2000;
         testSockNio = true;

Reply via email to