Repository: ambari
Updated Branches:
  refs/heads/trunk 245fda34c -> 434ff1f25


AMBARI-20345 - Alert Event Publisher Executor Doesn't Scale Threads 
(jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/434ff1f2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/434ff1f2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/434ff1f2

Branch: refs/heads/trunk
Commit: 434ff1f25fa62e26709a500d70bf8012e531d5bb
Parents: 245fda3
Author: Jonathan Hurley <jhur...@hortonworks.com>
Authored: Tue Mar 7 11:59:13 2017 -0500
Committer: Jonathan Hurley <jhur...@hortonworks.com>
Committed: Tue Mar 7 12:53:46 2017 -0500

----------------------------------------------------------------------
 ambari-server/docs/configuration/index.md       | 12 +++--
 .../server/configuration/Configuration.java     | 57 ++++++++++++++++++--
 .../events/publishers/AlertEventPublisher.java  | 21 +++++---
 .../internal/AlertResourceProviderTest.java     |  4 +-
 .../server/orm/dao/AlertsDAOCachedTest.java     |  6 ++-
 .../services/CachedAlertFlushServiceTest.java   |  7 ++-
 6 files changed, 87 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/docs/configuration/index.md
----------------------------------------------------------------------
diff --git a/ambari-server/docs/configuration/index.md 
b/ambari-server/docs/configuration/index.md
index 968db56..af962e1 100644
--- a/ambari-server/docs/configuration/index.md
+++ b/ambari-server/docs/configuration/index.md
@@ -54,7 +54,9 @@ The following are the properties which can be used to 
configure Ambari.
 | alerts.cache.enabled | Determines whether current alerts should be cached. 
Enabling this can increase performance on large cluster, but can also result in 
lost alert data if the cache is not flushed frequently. |`false` | 
 | alerts.cache.flush.interval | The time, in minutes, after which cached alert 
information is flushed to the database<br/><br/> This property is related to 
`alerts.cache.enabled`. |`10` | 
 | alerts.cache.size | The size of the alert cache.<br/><br/> This property is 
related to `alerts.cache.enabled`. |`50000` | 
-| alerts.execution.scheduler.maxThreads | The number of threads used to handle 
alerts received from the Ambari Agents. The value should be increased as the 
size of the cluster increases. |`2` | 
+| alerts.execution.scheduler.threadpool.size.core | The core number of threads 
used to process incoming alert events. The value should be increased as the 
size of the cluster increases. |`2` | 
+| alerts.execution.scheduler.threadpool.size.max | The number of threads used 
to handle alerts received from the Ambari Agents. The value should be increased 
as the size of the cluster increases. |`2` | 
+| alerts.execution.scheduler.threadpool.worker.size | The number of queued 
alerts allowed before discarding old alerts which have not been handled. The 
value should be increased as the size of the cluster increases. |`2000` | 
 | alerts.snmp.dispatcher.udp.port | The UDP port to use when binding the SNMP 
dispatcher on Ambari Server startup. If no port is specified, then a random 
port will be used. | | 
 | alerts.template.file | The full path to the XML file that describes the 
different alert templates. | | 
 | ambari.display.url | The URL to use when creating messages which should 
include the Ambari Server URL.<br/><br/>The following are examples of valid 
values:<ul><li>`http://ambari.apache.org:8080`</ul> | | 
@@ -251,8 +253,8 @@ The following are the properties which can be used to 
configure Ambari.
 | server.requestlogs.namepattern | The pattern of request log file name 
|`ambari-access-yyyy_mm_dd.log` | 
 | server.requestlogs.path | The location on the Ambari Server where request 
logs can be created. | | 
 | server.requestlogs.retaindays | The number of days that request log would be 
retained. |`15` | 
-| server.script.threads | The number of threads that should be allocated to 
run external script. |`4` | 
-| server.script.timeout | The time, in milliseconds, until an external script 
is killed. |`5000` | 
+| server.script.threads | The number of threads that should be allocated to 
run external script. |`20` | 
+| server.script.timeout | The time, in milliseconds, until an external script 
is killed. |`10000` | 
 | server.stage.command.execution_type | How to execute commands in one stage 
|`STAGE` | 
 | server.stages.parallel | Determines whether operations in different 
execution requests can be run concurrently. |`true` | 
 | server.startup.web.timeout | The time, in seconds, that the ambari-server 
Python script will wait for Jetty to startup before returning an error code. 
|`50` | 
@@ -314,7 +316,9 @@ As the size of a cluster grows, some of the default 
property values may no longe
 ####Alerts & Notifications
 | Property Name | 10 Hosts | ~50 Hosts | ~100 Hosts | 500+ Hosts | 
 | --- | --- | --- | --- | --- |
-| alerts.execution.scheduler.maxThreads | 2 | 2 | 4 | 4 | 
+| alerts.execution.scheduler.threadpool.size.core | 2 | 2 | 4 | 4 | 
+| alerts.execution.scheduler.threadpool.size.max | 2 | 2 | 8 | 8 | 
+| alerts.execution.scheduler.threadpool.worker.size | 400 | 2000 | 4000 | 
20000 | 
 | alerts.cache.enabled | false | false | false | true | 
 | alerts.cache.flush.interval | 10 | 10 | 10 | 10 | 
 | alerts.cache.size | 50000 | 50000 | 100000 | 100000 | 

http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 0991814..df334c5 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -42,6 +42,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +55,7 @@ import 
org.apache.ambari.server.actionmanager.CommandExecutionType;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.controller.spi.PropertyProvider;
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
 import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
 import org.apache.ambari.server.orm.JPATableGenerationStrategy;
 import org.apache.ambari.server.orm.PersistenceType;
@@ -2231,7 +2233,7 @@ public class Configuration {
       "alerts.template.file", null);
 
   /**
-   * The maximum number of threads which will handle published alert events.
+   * The core number of threads which will handle published alert events.
    */
   @ConfigurationMarkdown(
       group = ConfigurationGrouping.ALERTS,
@@ -2241,9 +2243,40 @@ public class Configuration {
           @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4"),
           @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "4") 
},
       markdown = @Markdown(
+          description = "The core number of threads used to process incoming 
alert events. The value should be increased as the size of the cluster 
increases."))
+  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE = new ConfigurationProperty<>(
+      "alerts.execution.scheduler.threadpool.size.core", 2);
+
+  /**
+   * The maximum number of threads which will handle published alert events.
+   */
+  @ConfigurationMarkdown(
+      group = ConfigurationGrouping.ALERTS,
+      scaleValues = {
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "2"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "8"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "8") 
},
+      markdown = @Markdown(
           description = "The number of threads used to handle alerts received 
from the Ambari Agents. The value should be increased as the size of the 
cluster increases."))
-  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_THREADS = new ConfigurationProperty<>(
-      "alerts.execution.scheduler.maxThreads", 2);
+  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE = new ConfigurationProperty<>(
+      "alerts.execution.scheduler.threadpool.size.max", 2);
+
+  /**
+   * The size of the {@link BlockingQueue} used to control the
+   * {@link ScalingThreadPoolExecutor} when handling incoming alert events.
+   */
+  @ConfigurationMarkdown(
+      group = ConfigurationGrouping.ALERTS,
+      scaleValues = {
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "400"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = 
"2000"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = 
"4000"),
+          @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = 
"20000") },
+      markdown = @Markdown(
+          description = "The number of queued alerts allowed before discarding 
old alerts which have not been handled. The value should be increased as the 
size of the cluster increases."))
+  public static final ConfigurationProperty<Integer> 
ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE = new ConfigurationProperty<>(
+      "alerts.execution.scheduler.threadpool.worker.size", 2000);
 
   /**
    * If {@code true} then alert information is cached and not immediately
@@ -4713,10 +4746,24 @@ public class Configuration {
   }
 
   /**
+   * @return core thread pool size for AlertEventPublisher, default 2
+   */
+  public int getAlertEventPublisherCorePoolSize() {
+    return 
Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE));
+  }
+
+  /**
    * @return max thread pool size for AlertEventPublisher, default 2
    */
-  public int getAlertEventPublisherPoolSize() {
-    return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS));
+  public int getAlertEventPublisherMaxPoolSize() {
+    return 
Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE));
+  }
+
+  /**
+   * @return the size of the queue for unhandled alert events
+   */
+  public int getAlertEventPublisherWorkerQueueSize() {
+    return 
Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
index 6b6f674..64cc733 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java
@@ -17,13 +17,13 @@
  */
 package org.apache.ambari.server.events.publishers;
 
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
 import org.apache.ambari.server.events.AlertEvent;
 
 import com.google.common.eventbus.AsyncEventBus;
@@ -53,13 +53,18 @@ public final class AlertEventPublisher {
    */
   @Inject
   public AlertEventPublisher(Configuration config) {
-    // create a fixed executor that is unbounded for now and will run rejected
-    // requests in the calling thread to prevent loss of alert handling
-    int poolsize = config.getAlertEventPublisherPoolSize();
-    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, poolsize, 0L,
-        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-        new AlertEventBusThreadFactory(),
-        new ThreadPoolExecutor.CallerRunsPolicy());
+    // create an executor which will scale with the number of queued work items
+    // when handling incoming alerts
+    int corePoolSize = config.getAlertEventPublisherCorePoolSize();
+    int maxPoolSize = config.getAlertEventPublisherMaxPoolSize();
+    int workerQueueSize = config.getAlertEventPublisherWorkerQueueSize();
+
+    ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(corePoolSize, 
maxPoolSize, 0L,
+        TimeUnit.SECONDS, workerQueueSize);
+
+    executor.allowCoreThreadTimeOut(false);
+    executor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardOldestPolicy());
+    executor.setThreadFactory(new AlertEventBusThreadFactory());
 
     m_eventBus = new AsyncEventBus(executor);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
index 4b2bb7a..63b201b 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java
@@ -737,7 +737,9 @@ public class AlertResourceProviderTest {
       
expect(configuration.getDatabaseDriver()).andReturn(JDBC_IN_MEMORY_DRIVER).anyTimes();
       expect(configuration.getDatabaseUser()).andReturn("sa").anyTimes();
       expect(configuration.getDatabasePassword()).andReturn("").anyTimes();
-      
expect(configuration.getAlertEventPublisherPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes();
       expect(configuration.getMasterKeyLocation()).andReturn(new 
File("/test")).anyTimes();
       
expect(configuration.getTemporaryKeyStoreRetentionMinutes()).andReturn(2l).anyTimes();
       
expect(configuration.isActivelyPurgeTemporaryKeyStore()).andReturn(true).anyTimes();

http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
index 02d942a..f47a997 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ambari.server.orm.dao;
 
+import static org.easymock.EasyMock.expect;
+
 import java.util.List;
 
 import javax.persistence.EntityManager;
@@ -274,7 +276,9 @@ public class AlertsDAOCachedTest {
 
       // required for since the configuration is being mocked
       Configuration configuration = 
EasyMock.createNiceMock(Configuration.class);
-      
EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes();
+      
EasyMock.expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes();
+      
EasyMock.expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes();
+      
EasyMock.expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes();
       
EasyMock.expect(configuration.isAlertCacheEnabled()).andReturn(Boolean.TRUE).anyTimes();
       
EasyMock.expect(configuration.getAlertCacheSize()).andReturn(100).anyTimes();
       EasyMock.replay(configuration);

http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
index 0ad67d0..992c7c4 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ambari.server.state.services;
 
+import static org.easymock.EasyMock.expect;
+
 import javax.persistence.EntityManager;
 
 import org.apache.ambari.server.configuration.Configuration;
@@ -123,7 +125,10 @@ public class CachedAlertFlushServiceTest extends 
EasyMockSupport {
 
       // required for since the configuration is being mocked
       Configuration configuration = createNiceMock(Configuration.class);
-      
EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes();
+      
expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes();
+      
expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes();
+
 
       EasyMock.replay(configuration);
 

Reply via email to