Throttle new events generation

(cherry picked from commit d29ea50)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9698f4d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9698f4d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9698f4d

Branch: refs/heads/ignite-zk
Commit: f9698f4d60dad816e33c6d6a430fd36312a01047
Parents: 64aa9ea
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Dec 13 16:34:47 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Dec 20 11:00:19 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 84 +++++++++++++++++---
 1 file changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f9698f4d/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 7032fd8..a72d742 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -93,6 +93,12 @@ public class ZookeeperDiscoveryImpl {
     static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
 
     /** */
+    private static final String IGNITE_ZOOKEEPER_DISCOVERY_MAX_EVTS = 
"IGNITE_ZOOKEEPER_DISCOVERY_MAX_EVTS";
+
+    /** */
+    private static final String IGNITE_ZOOKEEPER_DISCOVERY_EVTS_THROTTLE = 
"IGNITE_ZOOKEEPER_DISCOVERY_EVTS_THROTTLE";
+
+    /** */
     final ZookeeperDiscoverySpi spi;
 
     /** */
@@ -1153,23 +1159,20 @@ public class ZookeeperDiscoveryImpl {
 
         TreeMap<Integer, String> alives = new TreeMap<>();
 
-        TreeMap<Long, ZookeeperClusterNode> curTop = new 
TreeMap<>(rtState.top.nodesByOrder);
-
-        boolean newEvts = false;
-
         for (String child : aliveNodes) {
             Integer internalId = ZkIgnitePaths.aliveInternalId(child);
 
             Object old = alives.put(internalId, child);
 
             assert old == null;
-
-            if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
-                if (processJoinOnCoordinator(curTop, internalId, child))
-                    newEvts = true;
-            }
         }
 
+        TreeMap<Long, ZookeeperClusterNode> curTop = new 
TreeMap<>(rtState.top.nodesByOrder);
+
+        int newEvts = 0;
+
+        final int MAX_NEW_EVTS = 
IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_MAX_EVTS, 100);
+
         List<ZookeeperClusterNode> failedNodes = null;
 
         for (Map.Entry<Integer, ZookeeperClusterNode> e : 
rtState.top.nodesByInternalId.entrySet()) {
@@ -1183,11 +1186,51 @@ public class ZookeeperDiscoveryImpl {
 
                 generateNodeFail(curTop, failedNode);
 
-                newEvts = true;
+                newEvts++;
+
+                if (newEvts == MAX_NEW_EVTS) {
+                    saveAndProcessNewEvents();
+
+                    if (log.isInfoEnabled()) {
+                        log.info("Delay alive nodes change process, max event 
threshold reached [newEvts=" + newEvts +
+                            ", totalEvts=" + rtState.evtsData.evts.size() + 
']');
+                    }
+
+                    throttleNewEventsGeneration();
+
+                    rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
+
+                    return;
+                }
             }
         }
 
-        if (newEvts)
+        for (Map.Entry<Integer, String> e : alives.entrySet()) {
+            Integer internalId = e.getKey();
+
+            if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
+                if (processJoinOnCoordinator(curTop, internalId, 
e.getValue())) {
+                    newEvts++;
+
+                    if (newEvts == MAX_NEW_EVTS) {
+                        saveAndProcessNewEvents();
+
+                        if (log.isInfoEnabled()) {
+                            log.info("Delay alive nodes change process, max 
event threshold reached [newEvts=" + newEvts +
+                                ", totalEvts=" + rtState.evtsData.evts.size() 
+ ']');
+                        }
+
+                        throttleNewEventsGeneration();
+
+                        
rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, 
rtState.watcher);
+
+                        return;
+                    }
+                }
+            }
+        }
+
+        if (newEvts > 0)
             saveAndProcessNewEvents();
 
         if (failedNodes != null)
@@ -1195,6 +1238,25 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     *
+     */
+    private void throttleNewEventsGeneration() {
+        long delay = 
IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_EVTS_THROTTLE, 0);
+
+        if (delay > 0) {
+            if (log.isInfoEnabled())
+                log.info("Sleep delay before generate new events [delay=" + 
delay + ']');
+
+            try {
+                Thread.sleep(delay);
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param prefixId Path prefix.
      * @return Join data.

Reply via email to