Throttle new events generation (cherry picked from commit d29ea50)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9698f4d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9698f4d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9698f4d Branch: refs/heads/ignite-zk Commit: f9698f4d60dad816e33c6d6a430fd36312a01047 Parents: 64aa9ea Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 13 16:34:47 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 20 11:00:19 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZookeeperDiscoveryImpl.java | 84 +++++++++++++++++--- 1 file changed, 73 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f9698f4d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 7032fd8..a72d742 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -93,6 +93,12 @@ public class ZookeeperDiscoveryImpl { static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; /** */ + private static final String IGNITE_ZOOKEEPER_DISCOVERY_MAX_EVTS = "IGNITE_ZOOKEEPER_DISCOVERY_MAX_EVTS"; + + /** */ + private static final String IGNITE_ZOOKEEPER_DISCOVERY_EVTS_THROTTLE = "IGNITE_ZOOKEEPER_DISCOVERY_EVTS_THROTTLE"; + + /** */ final ZookeeperDiscoverySpi spi; /** */ @@ -1153,23 +1159,20 @@ public class ZookeeperDiscoveryImpl { TreeMap<Integer, String> alives = new TreeMap<>(); - TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(rtState.top.nodesByOrder); - - boolean newEvts = false; - for (String child : aliveNodes) { Integer internalId = ZkIgnitePaths.aliveInternalId(child); Object old = alives.put(internalId, child); assert old == null; - - if (!rtState.top.nodesByInternalId.containsKey(internalId)) { - if (processJoinOnCoordinator(curTop, internalId, child)) - newEvts = true; - } } + TreeMap<Long, ZookeeperClusterNode> curTop = new TreeMap<>(rtState.top.nodesByOrder); + + int newEvts = 0; + + final int MAX_NEW_EVTS = IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_MAX_EVTS, 100); + List<ZookeeperClusterNode> failedNodes = null; for (Map.Entry<Integer, ZookeeperClusterNode> e : rtState.top.nodesByInternalId.entrySet()) { @@ -1183,11 +1186,51 @@ public class ZookeeperDiscoveryImpl { generateNodeFail(curTop, failedNode); - newEvts = true; + newEvts++; + + if (newEvts == MAX_NEW_EVTS) { + saveAndProcessNewEvents(); + + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process, max event threshold reached [newEvts=" + newEvts + + ", totalEvts=" + rtState.evtsData.evts.size() + ']'); + } + + throttleNewEventsGeneration(); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + + return; + } } } - if (newEvts) + for (Map.Entry<Integer, String> e : alives.entrySet()) { + Integer internalId = e.getKey(); + + if (!rtState.top.nodesByInternalId.containsKey(internalId)) { + if (processJoinOnCoordinator(curTop, internalId, e.getValue())) { + newEvts++; + + if (newEvts == MAX_NEW_EVTS) { + saveAndProcessNewEvents(); + + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process, max event threshold reached [newEvts=" + newEvts + + ", totalEvts=" + rtState.evtsData.evts.size() + ']'); + } + + throttleNewEventsGeneration(); + + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); + + return; + } + } + } + } + + if (newEvts > 0) saveAndProcessNewEvents(); if (failedNodes != null) @@ -1195,6 +1238,25 @@ public class ZookeeperDiscoveryImpl { } /** + * + */ + private void throttleNewEventsGeneration() { + long delay = IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_EVTS_THROTTLE, 0); + + if (delay > 0) { + if (log.isInfoEnabled()) + log.info("Sleep delay before generate new events [delay=" + delay + ']'); + + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** * @param nodeId Node ID. * @param prefixId Path prefix. * @return Join data.