Repository: ignite Updated Branches: refs/heads/ignite-zk 97e851794 -> fcee8c846
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fcee8c84 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fcee8c84 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fcee8c84 Branch: refs/heads/ignite-zk Commit: fcee8c846274890f8eee8cc8f3644cdda912dedd Parents: 97e8517 Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 21 12:24:58 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 21 12:24:58 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZkDiscoveryCustomEventData.java | 7 ++- .../discovery/zk/internal/ZookeeperClient.java | 12 ++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 59 +++++++++++++------- .../zk/ZookeeperDiscoverySpiBasicTest.java | 8 ++- 4 files changed, 64 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java index cecb2dc..1346c24 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.UUID; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; /** @@ -37,17 +38,21 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { /** * @param evtId Event ID. * @param topVer Topology version. + * @param sndNodeId Sender node ID. * @param evtPath Event path. */ ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String evtPath) { super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer); + assert sndNodeId != null; + assert !F.isEmpty(evtPath); + this.sndNodeId = sndNodeId; this.evtPath = evtPath; } /** {@inheritDoc} */ @Override public String toString() { - return "CustomEventData [topVer=" + topologyVersion() + ']'; + return "CustomEventData [topVer=" + topologyVersion() + ", sndNode=" + sndNodeId + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 95d548b..6393b90 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -252,6 +252,18 @@ public class ZookeeperClient implements Watcher { } } + void deleteIfExists(String path, int ver) + throws ZookeeperClientFailedException, InterruptedException + { + try { + delete(path, ver); + } + catch (KeeperException.NoNodeException e) { + // No-op if node does not exist. + } + } + + void delete(String path, int ver) throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index f351b35..ece71f9 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -518,12 +518,10 @@ public class ZookeeperDiscoveryImpl { zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1); - // TODO ZK: on crd do not need listen for events path. - long time = System.currentTimeMillis() - start; if (log.isInfoEnabled()) - log.info("Discovery coordinator saved new events [topVer=" + evts.topVer + ", saveTime=" + time + ']'); + log.info("Discovery coordinator saved new topology events [topVer=" + evts.topVer + ", saveTime=" + time + ']'); onEventsUpdate(evts); } @@ -709,35 +707,55 @@ public class ZookeeperDiscoveryImpl { for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) { UUID sndNodeId = ZkPaths.customEventSendNodeId(evtE.getValue()); - byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue()); + ZookeeperClusterNode sndNode = top.nodesById.get(sndNodeId); - DiscoverySpiCustomMessage msg; + String evtDataPath = zkPaths.customEvtsDir + "/" + evtE.getValue(); - try { - msg = unmarshal(evtBytes); + if (sndNode != null) { + byte[] evtBytes = zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue()); - evts.evtIdGen++; + DiscoverySpiCustomMessage msg; - ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( - evts.evtIdGen, - evts.topVer, - sndNodeId, - evtE.getValue()); + try { + msg = unmarshal(evtBytes); - evtData.msg = msg; + evts.evtIdGen++; - evts.addEvent(evtData); + ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData( + evts.evtIdGen, + evts.topVer, + sndNodeId, + evtE.getValue()); - if (log.isInfoEnabled()) - log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion() + ", evt=" + msg + ']'); + evtData.msg = msg; + + evts.addEvent(evtData); + + if (log.isInfoEnabled()) + log.info("Generated CUSTOM event [topVer=" + evtData.topologyVersion() + ", evt=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal custom discovery message: " + e, e); + } } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal custom discovery message: " + e, e); + else { + U.warn(log, "Ignore custom event from unknown node: " + sndNodeId); + + zkClient.deleteIfExists(evtDataPath, -1); } evts.procCustEvt = evtE.getKey(); } + long start = System.currentTimeMillis(); + + zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1); + + long time = System.currentTimeMillis() - start; + + if (log.isInfoEnabled()) + log.info("Discovery coordinator saved new topology events [topVer=" + evts.topVer + ", saveTime=" + time + ']'); + onEventsUpdate(evts); } } @@ -895,6 +913,9 @@ public class ZookeeperDiscoveryImpl { */ @SuppressWarnings("unchecked") private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, DiscoverySpiCustomMessage msg) { + if (log.isInfoEnabled()) + log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg.getClass().getSimpleName() + ']'); + ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId); assert sndNode != null : evtData; http://git-wip-us.apache.org/repos/asf/ignite/blob/fcee8c84/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 162cf76..b0df770 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -240,9 +240,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testCustomEventsSimple1_5_Nodes() throws Exception { - Ignite srv0 = startGrids(2); + Ignite srv0 = startGrids(5); srv0.createCache(new CacheConfiguration<>("c1")); + + awaitPartitionMapExchange(); } /** @@ -562,6 +564,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assertEquals(node.name(), cache.get(i)); } } + + awaitPartitionMapExchange(); } /** @@ -579,7 +583,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { for (Ignite node : G.allGrids()) node.compute().broadcast(new DummyCallable(null)); - //awaitPartitionMapExchange(); + awaitPartitionMapExchange(); } /**