Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 97e851794 -> fcee8c846


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fcee8c84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fcee8c84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fcee8c84

Branch: refs/heads/ignite-zk
Commit: fcee8c846274890f8eee8cc8f3644cdda912dedd
Parents: 97e8517
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 21 12:24:58 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 21 12:24:58 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkDiscoveryCustomEventData.java |  7 ++-
 .../discovery/zk/internal/ZookeeperClient.java  | 12 ++++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 59 +++++++++++++-------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      |  8 ++-
 4 files changed, 64 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index cecb2dc..1346c24 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.UUID;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 
 /**
@@ -37,17 +38,21 @@ class ZkDiscoveryCustomEventData extends 
ZkDiscoveryEventData {
     /**
      * @param evtId Event ID.
      * @param topVer Topology version.
+     * @param sndNodeId Sender node ID.
      * @param evtPath Event path.
      */
     ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String 
evtPath) {
         super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
 
+        assert sndNodeId != null;
+        assert !F.isEmpty(evtPath);
+
         this.sndNodeId = sndNodeId;
         this.evtPath = evtPath;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "CustomEventData [topVer=" + topologyVersion() + ']';
+        return "CustomEventData [topVer=" + topologyVersion() + ", sndNode=" + 
sndNodeId + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 95d548b..6393b90 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -252,6 +252,18 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    void deleteIfExists(String path, int ver)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        try {
+            delete(path, ver);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // No-op if node does not exist.
+        }
+    }
+
+
     void delete(String path, int ver)
         throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
     {

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index f351b35..ece71f9 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -518,12 +518,10 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1);
 
-            // TODO ZK: on crd do not need listen for events path.
-
             long time = System.currentTimeMillis() - start;
 
             if (log.isInfoEnabled())
-                log.info("Discovery coordinator saved new events [topVer=" + 
evts.topVer + ", saveTime=" + time + ']');
+                log.info("Discovery coordinator saved new topology events 
[topVer=" + evts.topVer + ", saveTime=" + time + ']');
 
             onEventsUpdate(evts);
         }
@@ -709,35 +707,55 @@ public class ZookeeperDiscoveryImpl {
             for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
                 UUID sndNodeId = 
ZkPaths.customEventSendNodeId(evtE.getValue());
 
-                byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + "/" 
+ evtE.getValue());
+                ZookeeperClusterNode sndNode = top.nodesById.get(sndNodeId);
 
-                DiscoverySpiCustomMessage msg;
+                String evtDataPath = zkPaths.customEvtsDir + "/" + 
evtE.getValue();
 
-                try {
-                    msg = unmarshal(evtBytes);
+                if (sndNode != null) {
+                    byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + 
"/" + evtE.getValue());
 
-                    evts.evtIdGen++;
+                    DiscoverySpiCustomMessage msg;
 
-                    ZkDiscoveryCustomEventData evtData = new 
ZkDiscoveryCustomEventData(
-                        evts.evtIdGen,
-                        evts.topVer,
-                        sndNodeId,
-                        evtE.getValue());
+                    try {
+                        msg = unmarshal(evtBytes);
 
-                    evtData.msg = msg;
+                        evts.evtIdGen++;
 
-                    evts.addEvent(evtData);
+                        ZkDiscoveryCustomEventData evtData = new 
ZkDiscoveryCustomEventData(
+                            evts.evtIdGen,
+                            evts.topVer,
+                            sndNodeId,
+                            evtE.getValue());
 
-                    if (log.isInfoEnabled())
-                        log.info("Generated CUSTOM event [topVer=" + 
evtData.topologyVersion() + ", evt=" + msg + ']');
+                        evtData.msg = msg;
+
+                        evts.addEvent(evtData);
+
+                        if (log.isInfoEnabled())
+                            log.info("Generated CUSTOM event [topVer=" + 
evtData.topologyVersion() + ", evt=" + msg + ']');
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to unmarshal custom discovery 
message: " + e, e);
+                    }
                 }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to unmarshal custom discovery 
message: " + e, e);
+                else {
+                    U.warn(log, "Ignore custom event from unknown node: " + 
sndNodeId);
+
+                    zkClient.deleteIfExists(evtDataPath, -1);
                 }
 
                 evts.procCustEvt = evtE.getKey();
             }
 
+            long start = System.currentTimeMillis();
+
+            zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1);
+
+            long time = System.currentTimeMillis() - start;
+
+            if (log.isInfoEnabled())
+                log.info("Discovery coordinator saved new topology events 
[topVer=" + evts.topVer + ", saveTime=" + time + ']');
+
             onEventsUpdate(evts);
         }
     }
@@ -895,6 +913,9 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, 
DiscoverySpiCustomMessage msg) {
+        if (log.isInfoEnabled())
+            log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + 
msg.getClass().getSimpleName() + ']');
+
         ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId);
 
         assert sndNode != null : evtData;

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 162cf76..b0df770 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -240,9 +240,11 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testCustomEventsSimple1_5_Nodes() throws Exception {
-        Ignite srv0 = startGrids(2);
+        Ignite srv0 = startGrids(5);
 
         srv0.createCache(new CacheConfiguration<>("c1"));
+
+        awaitPartitionMapExchange();
     }
 
     /**
@@ -562,6 +564,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                 assertEquals(node.name(), cache.get(i));
             }
         }
+
+        awaitPartitionMapExchange();
     }
 
     /**
@@ -579,7 +583,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         for (Ignite node : G.allGrids())
             node.compute().broadcast(new DummyCallable(null));
 
-        //awaitPartitionMapExchange();
+        awaitPartitionMapExchange();
     }
 
     /**

Reply via email to