Repository: ambari Updated Branches: refs/heads/trunk 245fda34c -> 434ff1f25
AMBARI-20345 - Alert Event Publisher Executor Doesn't Scale Threads (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/434ff1f2 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/434ff1f2 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/434ff1f2 Branch: refs/heads/trunk Commit: 434ff1f25fa62e26709a500d70bf8012e531d5bb Parents: 245fda3 Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Tue Mar 7 11:59:13 2017 -0500 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Tue Mar 7 12:53:46 2017 -0500 ---------------------------------------------------------------------- ambari-server/docs/configuration/index.md | 12 +++-- .../server/configuration/Configuration.java | 57 ++++++++++++++++++-- .../events/publishers/AlertEventPublisher.java | 21 +++++--- .../internal/AlertResourceProviderTest.java | 4 +- .../server/orm/dao/AlertsDAOCachedTest.java | 6 ++- .../services/CachedAlertFlushServiceTest.java | 7 ++- 6 files changed, 87 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/docs/configuration/index.md ---------------------------------------------------------------------- diff --git a/ambari-server/docs/configuration/index.md b/ambari-server/docs/configuration/index.md index 968db56..af962e1 100644 --- a/ambari-server/docs/configuration/index.md +++ b/ambari-server/docs/configuration/index.md @@ -54,7 +54,9 @@ The following are the properties which can be used to configure Ambari. | alerts.cache.enabled | Determines whether current alerts should be cached. Enabling this can increase performance on large cluster, but can also result in lost alert data if the cache is not flushed frequently. |`false` | | alerts.cache.flush.interval | The time, in minutes, after which cached alert information is flushed to the database<br/><br/> This property is related to `alerts.cache.enabled`. |`10` | | alerts.cache.size | The size of the alert cache.<br/><br/> This property is related to `alerts.cache.enabled`. |`50000` | -| alerts.execution.scheduler.maxThreads | The number of threads used to handle alerts received from the Ambari Agents. The value should be increased as the size of the cluster increases. |`2` | +| alerts.execution.scheduler.threadpool.size.core | The core number of threads used to process incoming alert events. The value should be increased as the size of the cluster increases. |`2` | +| alerts.execution.scheduler.threadpool.size.max | The number of threads used to handle alerts received from the Ambari Agents. The value should be increased as the size of the cluster increases. |`2` | +| alerts.execution.scheduler.threadpool.worker.size | The number of queued alerts allowed before discarding old alerts which have not been handled. The value should be increased as the size of the cluster increases. |`2000` | | alerts.snmp.dispatcher.udp.port | The UDP port to use when binding the SNMP dispatcher on Ambari Server startup. If no port is specified, then a random port will be used. | | | alerts.template.file | The full path to the XML file that describes the different alert templates. | | | ambari.display.url | The URL to use when creating messages which should include the Ambari Server URL.<br/><br/>The following are examples of valid values:<ul><li>`http://ambari.apache.org:8080`</ul> | | @@ -251,8 +253,8 @@ The following are the properties which can be used to configure Ambari. | server.requestlogs.namepattern | The pattern of request log file name |`ambari-access-yyyy_mm_dd.log` | | server.requestlogs.path | The location on the Ambari Server where request logs can be created. | | | server.requestlogs.retaindays | The number of days that request log would be retained. |`15` | -| server.script.threads | The number of threads that should be allocated to run external script. |`4` | -| server.script.timeout | The time, in milliseconds, until an external script is killed. |`5000` | +| server.script.threads | The number of threads that should be allocated to run external script. |`20` | +| server.script.timeout | The time, in milliseconds, until an external script is killed. |`10000` | | server.stage.command.execution_type | How to execute commands in one stage |`STAGE` | | server.stages.parallel | Determines whether operations in different execution requests can be run concurrently. |`true` | | server.startup.web.timeout | The time, in seconds, that the ambari-server Python script will wait for Jetty to startup before returning an error code. |`50` | @@ -314,7 +316,9 @@ As the size of a cluster grows, some of the default property values may no longe ####Alerts & Notifications | Property Name | 10 Hosts | ~50 Hosts | ~100 Hosts | 500+ Hosts | | --- | --- | --- | --- | --- | -| alerts.execution.scheduler.maxThreads | 2 | 2 | 4 | 4 | +| alerts.execution.scheduler.threadpool.size.core | 2 | 2 | 4 | 4 | +| alerts.execution.scheduler.threadpool.size.max | 2 | 2 | 8 | 8 | +| alerts.execution.scheduler.threadpool.worker.size | 400 | 2000 | 4000 | 20000 | | alerts.cache.enabled | false | false | false | true | | alerts.cache.flush.interval | 10 | 10 | 10 | 10 | | alerts.cache.size | 50000 | 50000 | 100000 | 100000 | http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 0991814..df334c5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -42,6 +42,7 @@ import java.util.Properties; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,6 +55,7 @@ import org.apache.ambari.server.actionmanager.CommandExecutionType; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.Stage; import org.apache.ambari.server.controller.spi.PropertyProvider; +import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener; import org.apache.ambari.server.orm.JPATableGenerationStrategy; import org.apache.ambari.server.orm.PersistenceType; @@ -2231,7 +2233,7 @@ public class Configuration { "alerts.template.file", null); /** - * The maximum number of threads which will handle published alert events. + * The core number of threads which will handle published alert events. */ @ConfigurationMarkdown( group = ConfigurationGrouping.ALERTS, @@ -2241,9 +2243,40 @@ public class Configuration { @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4"), @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "4") }, markdown = @Markdown( + description = "The core number of threads used to process incoming alert events. The value should be increased as the size of the cluster increases.")) + public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE = new ConfigurationProperty<>( + "alerts.execution.scheduler.threadpool.size.core", 2); + + /** + * The maximum number of threads which will handle published alert events. + */ + @ConfigurationMarkdown( + group = ConfigurationGrouping.ALERTS, + scaleValues = { + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "2"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "8"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "8") }, + markdown = @Markdown( description = "The number of threads used to handle alerts received from the Ambari Agents. The value should be increased as the size of the cluster increases.")) - public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS = new ConfigurationProperty<>( - "alerts.execution.scheduler.maxThreads", 2); + public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE = new ConfigurationProperty<>( + "alerts.execution.scheduler.threadpool.size.max", 2); + + /** + * The size of the {@link BlockingQueue} used to control the + * {@link ScalingThreadPoolExecutor} when handling incoming alert events. + */ + @ConfigurationMarkdown( + group = ConfigurationGrouping.ALERTS, + scaleValues = { + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "400"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2000"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4000"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "20000") }, + markdown = @Markdown( + description = "The number of queued alerts allowed before discarding old alerts which have not been handled. The value should be increased as the size of the cluster increases.")) + public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE = new ConfigurationProperty<>( + "alerts.execution.scheduler.threadpool.worker.size", 2000); /** * If {@code true} then alert information is cached and not immediately @@ -4713,10 +4746,24 @@ public class Configuration { } /** + * @return core thread pool size for AlertEventPublisher, default 2 + */ + public int getAlertEventPublisherCorePoolSize() { + return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE)); + } + + /** * @return max thread pool size for AlertEventPublisher, default 2 */ - public int getAlertEventPublisherPoolSize() { - return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS)); + public int getAlertEventPublisherMaxPoolSize() { + return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE)); + } + + /** + * @return the size of the queue for unhandled alert events + */ + public int getAlertEventPublisherWorkerQueueSize() { + return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE)); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java index 6b6f674..64cc733 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java @@ -17,13 +17,13 @@ */ package org.apache.ambari.server.events.publishers; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; import org.apache.ambari.server.events.AlertEvent; import com.google.common.eventbus.AsyncEventBus; @@ -53,13 +53,18 @@ public final class AlertEventPublisher { */ @Inject public AlertEventPublisher(Configuration config) { - // create a fixed executor that is unbounded for now and will run rejected - // requests in the calling thread to prevent loss of alert handling - int poolsize = config.getAlertEventPublisherPoolSize(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(2, poolsize, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), - new AlertEventBusThreadFactory(), - new ThreadPoolExecutor.CallerRunsPolicy()); + // create an executor which will scale with the number of queued work items + // when handling incoming alerts + int corePoolSize = config.getAlertEventPublisherCorePoolSize(); + int maxPoolSize = config.getAlertEventPublisherMaxPoolSize(); + int workerQueueSize = config.getAlertEventPublisherWorkerQueueSize(); + + ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, + TimeUnit.SECONDS, workerQueueSize); + + executor.allowCoreThreadTimeOut(false); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); + executor.setThreadFactory(new AlertEventBusThreadFactory()); m_eventBus = new AsyncEventBus(executor); } http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java index 4b2bb7a..63b201b 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java @@ -737,7 +737,9 @@ public class AlertResourceProviderTest { expect(configuration.getDatabaseDriver()).andReturn(JDBC_IN_MEMORY_DRIVER).anyTimes(); expect(configuration.getDatabaseUser()).andReturn("sa").anyTimes(); expect(configuration.getDatabasePassword()).andReturn("").anyTimes(); - expect(configuration.getAlertEventPublisherPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes(); expect(configuration.getMasterKeyLocation()).andReturn(new File("/test")).anyTimes(); expect(configuration.getTemporaryKeyStoreRetentionMinutes()).andReturn(2l).anyTimes(); expect(configuration.isActivelyPurgeTemporaryKeyStore()).andReturn(true).anyTimes(); http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java index 02d942a..f47a997 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java @@ -17,6 +17,8 @@ */ package org.apache.ambari.server.orm.dao; +import static org.easymock.EasyMock.expect; + import java.util.List; import javax.persistence.EntityManager; @@ -274,7 +276,9 @@ public class AlertsDAOCachedTest { // required for since the configuration is being mocked Configuration configuration = EasyMock.createNiceMock(Configuration.class); - EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes(); + EasyMock.expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes(); + EasyMock.expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes(); + EasyMock.expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes(); EasyMock.expect(configuration.isAlertCacheEnabled()).andReturn(Boolean.TRUE).anyTimes(); EasyMock.expect(configuration.getAlertCacheSize()).andReturn(100).anyTimes(); EasyMock.replay(configuration); http://git-wip-us.apache.org/repos/asf/ambari/blob/434ff1f2/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java index 0ad67d0..992c7c4 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java @@ -17,6 +17,8 @@ */ package org.apache.ambari.server.state.services; +import static org.easymock.EasyMock.expect; + import javax.persistence.EntityManager; import org.apache.ambari.server.configuration.Configuration; @@ -123,7 +125,10 @@ public class CachedAlertFlushServiceTest extends EasyMockSupport { // required for since the configuration is being mocked Configuration configuration = createNiceMock(Configuration.class); - EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes(); + expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes(); + EasyMock.replay(configuration);