AMBARI-20345 - Alert Event Publisher Executor Doesn't Scale Threads 
(jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/07a30a12
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/07a30a12
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/07a30a12

Branch: refs/heads/branch-dev-logsearch
Commit: 07a30a12b6b7b091e735f9688ebc6934d4b9cf23
Parents: dff7754
Author: Jonathan Hurley <jhur...@hortonworks.com>
Authored: Tue Mar 7 11:59:13 2017 -0500
Committer: Jonathan Hurley <jhur...@hortonworks.com>
Committed: Tue Mar 7 14:43:11 2017 -0500

----------------------------------------------------------------------
 ambari-server/docs/configuration/index.md       |  12 +-
 .../server/configuration/Configuration.java     | 134 +++++++++++++------
 .../events/publishers/AlertEventPublisher.java  |  21 +--
 .../internal/AlertResourceProviderTest.java     |   4 +-
 .../server/orm/dao/AlertsDAOCachedTest.java     |   6 +-
 .../services/CachedAlertFlushServiceTest.java   |   7 +-
 6 files changed, 126 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/docs/configuration/index.md
----------------------------------------------------------------------
diff --git a/ambari-server/docs/configuration/index.md 
b/ambari-server/docs/configuration/index.md
index 58eb7c7..f836fc9 100644
--- a/ambari-server/docs/configuration/index.md
+++ b/ambari-server/docs/configuration/index.md
@@ -54,7 +54,9 @@ The following are the properties which can be used to 
configure Ambari.
 | alerts.cache.enabled | Determines whether current alerts should be cached. 
Enabling this can increase performance on large cluster, but can also result in 
lost alert data if the cache is not flushed frequently. |`false` | 
 | alerts.cache.flush.interval | The time, in minutes, after which cached alert 
information is flushed to the database<br/><br/> This property is related to 
`alerts.cache.enabled`. |`10` | 
 | alerts.cache.size | The size of the alert cache.<br/><br/> This property is 
related to `alerts.cache.enabled`. |`50000` | 
-| alerts.execution.scheduler.maxThreads | The number of threads used to handle 
alerts received from the Ambari Agents. The value should be increased as the 
size of the cluster increases. |`2` | 
+| alerts.execution.scheduler.threadpool.size.core | The core number of threads 
used to process incoming alert events. The value should be increased as the 
size of the cluster increases. |`2` | 
+| alerts.execution.scheduler.threadpool.size.max | The number of threads used 
to handle alerts received from the Ambari Agents. The value should be increased 
as the size of the cluster increases. |`2` | 
+| alerts.execution.scheduler.threadpool.worker.size | The number of queued 
alerts allowed before discarding old alerts which have not been handled. The 
value should be increased as the size of the cluster increases. |`2000` | 
 | alerts.snmp.dispatcher.udp.port | The UDP port to use when binding the SNMP 
dispatcher on Ambari Server startup. If no port is specified, then a random 
port will be used. | | 
 | alerts.template.file | The full path to the XML file that describes the 
different alert templates. | | 
 | ambari.display.url | The URL to use when creating messages which should 
include the Ambari Server URL.<br/><br/>The following are examples of valid 
values:<ul><li>`http://ambari.apache.org:8080`</ul> | | 
@@ -244,8 +246,8 @@ The following are the properties which can be used to 
configure Ambari.
 | server.property-provider.threadpool.size.core | The core number of threads 
that will be used to retrieve data from federated datasources, such as remote 
JMX endpoints. |`16` | 
 | server.property-provider.threadpool.size.max | The maximum number of threads 
that will be used to retrieve data from federated datasources, such as remote 
JMX endpoints. |`32` | 
 | server.property-provider.threadpool.worker.size | The maximum size of 
pending federated datasource requests, such as those to JMX endpoints, which 
can be queued before rejecting new requests. |`2147483647` | 
-| server.script.threads | The number of threads that should be allocated to 
run external script. |`4` | 
-| server.script.timeout | The time, in milliseconds, until an external script 
is killed. |`5000` | 
+| server.script.threads | The number of threads that should be allocated to 
run external script. |`20` | 
+| server.script.timeout | The time, in milliseconds, until an external script 
is killed. |`10000` | 
 | server.stage.command.execution_type | How to execute commands in one stage 
|`STAGE` | 
 | server.stages.parallel | Determines whether operations in different 
execution requests can be run concurrently. |`true` | 
 | server.startup.web.timeout | The time, in seconds, that the ambari-server 
Python script will wait for Jetty to startup before returning an error code. 
|`50` | 
@@ -303,7 +305,9 @@ As the size of a cluster grows, some of the default 
property values may no longe
 ####Alerts & Notifications
 | Property Name | 10 Hosts | ~50 Hosts | ~100 Hosts | 500+ Hosts | 
 | --- | --- | --- | --- | --- |
-| alerts.execution.scheduler.maxThreads | 2 | 2 | 4 | 4 | 
+| alerts.execution.scheduler.threadpool.size.core | 2 | 2 | 4 | 4 | 
+| alerts.execution.scheduler.threadpool.size.max | 2 | 2 | 8 | 8 | 
+| alerts.execution.scheduler.threadpool.worker.size | 400 | 2000 | 4000 | 
20000 | 
 | alerts.cache.enabled | false | false | false | true | 
 | alerts.cache.flush.interval | 10 | 10 | 10 | 10 | 
 | alerts.cache.size | 50000 | 50000 | 100000 | 100000 | 

http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index cfb4d3d..405251e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -17,16 +17,36 @@
  */
 package org.apache.ambari.server.configuration;
 
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonPrimitive;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Writer;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.reflect.Field;
+import java.security.cert.CertificateException;
+import java.security.interfaces.RSAPublicKey;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.ambari.annotations.Experimental;
 import org.apache.ambari.annotations.ExperimentalFeature;
 import org.apache.ambari.annotations.Markdown;
@@ -35,6 +55,7 @@ import 
org.apache.ambari.server.actionmanager.CommandExecutionType;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.controller.spi.PropertyProvider;
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
 import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
 import org.apache.ambari.server.orm.JPATableGenerationStrategy;
 import org.apache.ambari.server.orm.PersistenceType;
@@ -70,34 +91,16 @@ import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Writer;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.lang.reflect.Field;
-import java.security.cert.CertificateException;
-import java.security.interfaces.RSAPublicKey;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 /**
  * The {@link Configuration} class is used to read from the
@@ -2206,7 +2209,7 @@ public class Configuration {
       "alerts.template.file", null);
 
   /**
-   * The maximum number of threads which will handle published alert events.
+   * The core number of threads which will handle published alert events.
    */
   @ConfigurationMarkdown(
       group = ConfigurationGrouping.ALERTS,
@@ -2216,9 +2219,40 @@ public class Configuration {
           @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4"),
           @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "4") 
},
       markdown = @Markdown(
+          description = "The core number of threads used to process incoming 
alert events. The value should be increased as the size of the cluster 
increases."))
+  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE = new ConfigurationProperty<>(
+      "alerts.execution.scheduler.threadpool.size.core", 2);
+
+  /**
+   * The maximum number of threads which will handle published alert events.
+   */
+  @ConfigurationMarkdown(
+      group = ConfigurationGrouping.ALERTS,
+      scaleValues = {
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "2"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "8"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "8") 
},
+      markdown = @Markdown(
           description = "The number of threads used to handle alerts received 
from the Ambari Agents. The value should be increased as the size of the 
cluster increases."))
-  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_THREADS = new ConfigurationProperty<>(
-      "alerts.execution.scheduler.maxThreads", 2);
+  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE = new ConfigurationProperty<>(
+      "alerts.execution.scheduler.threadpool.size.max", 2);
+
+  /**
+   * The size of the {@link BlockingQueue} used to control the
+   * {@link ScalingThreadPoolExecutor} when handling incoming alert events.
+   */
+  @ConfigurationMarkdown(
+      group = ConfigurationGrouping.ALERTS,
+      scaleValues = {
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "400"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = 
"2000"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = 
"4000"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = 
"20000") },
+      markdown = @Markdown(
+          description = "The number of queued alerts allowed before discarding 
old alerts which have not been handled. The value should be increased as the 
size of the cluster increases."))
+  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE = new ConfigurationProperty<>(
+      "alerts.execution.scheduler.threadpool.worker.size", 2000);
 
   /**
    * If {@code true} then alert information is cached and not immediately
@@ -4511,10 +4545,24 @@ public class Configuration {
   }
 
   /**
+   * @return core thread pool size for AlertEventPublisher, default 2
+   */
+  public int getAlertEventPublisherCorePoolSize() {
+    return 
Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE));
+  }
+
+  /**
    * @return max thread pool size for AlertEventPublisher, default 2
    */
-  public int getAlertEventPublisherPoolSize() {
-    return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS));
+  public int getAlertEventPublisherMaxPoolSize() {
+    return 
Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE));
+  }
+
+  /**
+   * @return the size of the queue for unhandled alert events
+   */
+  public int getAlertEventPublisherWorkerQueueSize() {
+    return 
Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
index 5427a7b..78d5df6 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ambari.server.events.publishers;
 
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -25,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.inject.Inject;
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
 import org.apache.ambari.server.events.AlertEvent;
 
 import com.google.common.eventbus.AsyncEventBus;
@@ -53,13 +53,18 @@ public final class AlertEventPublisher {
    */
   @Inject
   public AlertEventPublisher(Configuration config) {
-    // create a fixed executor that is unbounded for now and will run rejected
-    // requests in the calling thread to prevent loss of alert handling
-    int poolsize = config.getAlertEventPublisherPoolSize();
-    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, poolsize, 0L,
-        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-        new AlertEventBusThreadFactory(),
-        new ThreadPoolExecutor.CallerRunsPolicy());
+    // create an executor which will scale with the number of queued work items
+    // when handling incoming alerts
+    int corePoolSize = config.getAlertEventPublisherCorePoolSize();
+    int maxPoolSize = config.getAlertEventPublisherMaxPoolSize();
+    int workerQueueSize = config.getAlertEventPublisherWorkerQueueSize();
+
+    ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(corePoolSize, 
maxPoolSize, 0L,
+        TimeUnit.SECONDS, workerQueueSize);
+
+    executor.allowCoreThreadTimeOut(false);
+    executor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardOldestPolicy());
+    executor.setThreadFactory(new AlertEventBusThreadFactory());
 
     m_eventBus = new AsyncEventBus(executor);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
index 25b9821..f1c5c35 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
@@ -737,7 +737,9 @@ public class AlertResourceProviderTest {
       
expect(configuration.getDatabaseDriver()).andReturn(JDBC_IN_MEMORY_DRIVER).anyTimes();
       expect(configuration.getDatabaseUser()).andReturn("sa").anyTimes();
       expect(configuration.getDatabasePassword()).andReturn("").anyTimes();
-      
expect(configuration.getAlertEventPublisherPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes();
       expect(configuration.getMasterKeyLocation()).andReturn(new 
File("/test")).anyTimes();
       
expect(configuration.getTemporaryKeyStoreRetentionMinutes()).andReturn(2l).anyTimes();
       
expect(configuration.isActivelyPurgeTemporaryKeyStore()).andReturn(true).anyTimes();

http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
index 02d942a..f47a997 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ambari.server.orm.dao;
 
+import static org.easymock.EasyMock.expect;
+
 import java.util.List;
 
 import javax.persistence.EntityManager;
@@ -274,7 +276,9 @@ public class AlertsDAOCachedTest {
 
       // required for since the configuration is being mocked
       Configuration configuration = 
EasyMock.createNiceMock(Configuration.class);
-      
EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes();
+      
EasyMock.expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes();
+      
EasyMock.expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes();
+      
EasyMock.expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes();
       
EasyMock.expect(configuration.isAlertCacheEnabled()).andReturn(Boolean.TRUE).anyTimes();
       
EasyMock.expect(configuration.getAlertCacheSize()).andReturn(100).anyTimes();
       EasyMock.replay(configuration);

http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
index 0ad67d0..992c7c4 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ambari.server.state.services;
 
+import static org.easymock.EasyMock.expect;
+
 import javax.persistence.EntityManager;
 
 import org.apache.ambari.server.configuration.Configuration;
@@ -123,7 +125,10 @@ public class CachedAlertFlushServiceTest extends 
EasyMockSupport {
 
       // required for since the configuration is being mocked
       Configuration configuration = createNiceMock(Configuration.class);
-      
EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes();
+      
expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes();
+
 
       EasyMock.replay(configuration);
 

Reply via email to