using sheduled thread pool per *monitors
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/6189945b Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/6189945b Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/6189945b Branch: refs/heads/stratos-4.1.x Commit: 6189945b97b8bdacce63f4f6792684e33260812f Parents: d4b35c0 Author: Isuru Haththotuwa <[email protected]> Authored: Mon Dec 21 20:58:15 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Mon Dec 21 20:58:15 2015 +0530 ---------------------------------------------------------------------- .../applications/topic/ApplicationBuilder.java | 3 + .../AutoscalerTopologyEventReceiver.java | 2 + .../stratos/autoscaler/monitor/Monitor.java | 2 + .../monitor/cluster/ClusterMonitor.java | 13 +++-- .../component/ParentComponentMonitor.java | 11 +++- .../services/impl/AutoscalerServiceImpl.java | 5 ++ .../autoscaler/util/AutoscalerConstants.java | 3 +- .../stratos/autoscaler/util/AutoscalerUtil.java | 1 + .../threading/GracefulThreadPoolTerminator.java | 2 + .../common/threading/StratosThreadPool.java | 60 ++++++++++++++++++-- 10 files changed, 89 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java index 5fd4d5a..ec6b50a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java @@ -329,10 +329,12 @@ public class ApplicationBuilder { getAliasToActiveChildMonitorsMap().values()) { //destroying the drools monitor1.destroy(); + monitor1.cleanup(); } } // stopping application thread applicationMonitor.destroy(); + applicationMonitor.cleanup(); AutoscalerContext.getInstance().removeAppMonitor(applicationId); // Remove network partition algorithm context AutoscalerContext.getInstance().removeNetworkPartitionAlgorithmContext(applicationId); @@ -454,6 +456,7 @@ public class ApplicationBuilder { getAliasToActiveChildMonitorsMap().values()) { //destroying the drools monitor1.destroy(); + monitor1.cleanup(); } } org.apache.stratos.autoscaler.context.partition.network.NetworkPartitionContext networkPartitionContext = http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index c150a1f..5f123b2 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -204,6 +204,7 @@ public class AutoscalerTopologyEventReceiver { } //changing the status in the monitor, will notify its parent monitor monitor.destroy(); + monitor.cleanup(); monitor.notifyParentMonitor(ClusterStatus.Created, instanceId); } @@ -316,6 +317,7 @@ public class AutoscalerTopologyEventReceiver { if (!monitor.hasInstance() && (appMonitor != null && appMonitor.isTerminating())) { //Destroying and Removing the Cluster monitor monitor.destroy(); + monitor.cleanup(); AutoscalerContext.getInstance().removeClusterMonitor(clusterId); } http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java index c58ec41..2814958 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java @@ -216,4 +216,6 @@ public abstract class Monitor implements EventHandler, Runnable { public enum MonitorType { Application, Group, Cluster } + + public abstract void cleanup (); } http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index ead9130..ee936db 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -86,8 +86,8 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ClusterMonitor extends Monitor { private static final Log log = LogFactory.getLog(ClusterMonitor.class); - private final ScheduledThreadPoolExecutor scheduler; - private final ThreadPoolExecutor executor; + private ScheduledThreadPoolExecutor scheduler; + private ThreadPoolExecutor executor; protected boolean hasFaultyMember = false; protected ClusterContext clusterContext; protected String serviceType; @@ -107,11 +107,11 @@ public class ClusterMonitor extends Monitor { public ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree, String deploymentPolicyId) { - scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID, 50); int threadPoolSize = Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100); executor = StratosThreadPool.getExecutorService( AutoscalerConstants.MONITOR_THREAD_POOL_ID, ((int)Math.ceil(threadPoolSize/3)), threadPoolSize); this.clusterId = cluster.getClusterId(); + scheduler = StratosThreadPool.getScheduledExecutorService(clusterId, 2); readConfigurations(); this.groupScalingEnabledSubtree = groupScalingEnabledSubtree; this.setCluster(new Cluster(cluster)); @@ -147,7 +147,7 @@ public class ClusterMonitor extends Monitor { return MonitorType.Cluster; } - public void startScheduler() { + public synchronized void startScheduler() { schedulerFuture = scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS); } @@ -1540,4 +1540,9 @@ public class ClusterMonitor extends Monitor { public String getDeploymentPolicyId() { return deploymentPolicyId; } + + public void cleanup () { + // shutdown thread pools + StratosThreadPool.shutDownScheduledThreadPoolGracefully(clusterId); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java index 9efdf7c..24983a9 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java @@ -68,8 +68,7 @@ public abstract class ParentComponentMonitor extends Monitor { private static final Log log = LogFactory.getLog(ParentComponentMonitor.class); //Scheduler executor service to execute this monitor in a thread - private final ScheduledThreadPoolExecutor scheduler = StratosThreadPool.getScheduledExecutorService( - "autoscaler.monitor.scheduler.thread.pool", 100); + private final ScheduledThreadPoolExecutor scheduler; //The monitors dependency tree with all the start-able/kill-able dependencies protected DependencyTree startupDependencyTree; //The monitors dependency tree with all the scaling dependencies @@ -95,6 +94,7 @@ public abstract class ParentComponentMonitor extends Monitor { terminatingInstancesMap = new ConcurrentHashMap<String, List<String>>(); pendingChildMonitorsList = new ArrayList<String>(); id = component.getUniqueIdentifier(); + scheduler = StratosThreadPool.getScheduledExecutorService(id, 2); // Building the startup dependencies for this monitor within the immediate children startupDependencyTree = DependencyBuilder.getInstance().buildDependency(component); @@ -126,7 +126,7 @@ public abstract class ParentComponentMonitor extends Monitor { /** * Starting the scheduler for the monitor */ - public void startScheduler() { + public synchronized void startScheduler() { int monitoringIntervalMilliseconds = 60000; schedulerFuture = scheduler.scheduleAtFixedRate(this, 0, monitoringIntervalMilliseconds, TimeUnit.MILLISECONDS); @@ -1067,4 +1067,9 @@ public abstract class ParentComponentMonitor extends Monitor { public void removeMonitor(String id) { this.aliasToActiveChildMonitorsMap.remove(id); } + + public void cleanup () { + // shutdown thread pools + StratosThreadPool.shutDownScheduledThreadPoolGracefully(id); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java index 0943de0..a77c80a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java @@ -935,6 +935,7 @@ public class AutoscalerServiceImpl implements AutoscalerService { getAppMonitor(applicationId); if (applicationMonitor != null) { applicationMonitor.destroy(); + applicationMonitor.cleanup(); if (applicationMonitor.hasInstance()) { Map<String, Monitor> monitors = applicationMonitor. @@ -966,6 +967,7 @@ public class AutoscalerServiceImpl implements AutoscalerService { getClusterMonitor(clusterId); if (clusterMonitor != null) { clusterMonitor.destroy(); + clusterMonitor.cleanup(); } else { if (log.isDebugEnabled()) { log.debug(String.format( @@ -977,6 +979,8 @@ public class AutoscalerServiceImpl implements AutoscalerService { Collection<ClusterInstance> allClusterInstances = cluster.getClusterInstances(); if (allClusterInstances.isEmpty() && clusterMonitor != null) { + clusterMonitor.destroy(); + clusterMonitor.cleanup(); AutoscalerContext.getInstance().removeClusterMonitor(clusterId); } @@ -1007,6 +1011,7 @@ public class AutoscalerServiceImpl implements AutoscalerService { ApplicationContext applicationContext = AutoscalerContext.getInstance(). getApplicationContext(applicationId); applicationMonitor.destroy(); + applicationMonitor.cleanup(); AutoscalerContext.getInstance().removeAppMonitor(applicationId); // Remove network partition algorithm context AutoscalerContext.getInstance().removeNetworkPartitionAlgorithmContext(applicationId); http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java index ef12983..62ae532 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java @@ -38,7 +38,8 @@ public final class AutoscalerConstants { public static final String AUTOSCALER_SCHEDULER_ID = "autoscaler.scheduler.thread.pool"; public static final String SCHEDULER_THREAD_POOL_SIZE_KEY = "autoscaler.scheduler.thread.pool.size"; public static final int AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE = 5; - public static final int AUTOSCALER_THREAD_POOL_SIZE = 50; + //public static final int AUTOSCALER_THREAD_POOL_SIZE = 50; + public static final int AUTOSCALER_THREAD_POOL_SIZE = 10; public static final String COMPONENTS_CONFIG = CarbonUtils.getCarbonConfigDirPath() + File.separator + "stratos-config.xml"; http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java index b6ce0ed..e00f4e9 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java @@ -1027,6 +1027,7 @@ public class AutoscalerUtil { while(monitorsIter.hasNext()) { Monitor monitor = monitorsIter.next(); monitor.destroy(); + monitor.cleanup(); Iterator<Instance> instances = monitor.getInstances().iterator(); while(instances.hasNext()) { Instance instance = instances.next(); http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java index 70cda66..2ccfc45 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java @@ -41,6 +41,7 @@ public class GracefulThreadPoolTerminator implements Callable<String> { @Override public String call() { // try to shut down gracefully + log.info("Attempting to gracefully shut down thread pool " + threadPoolId); executor.shutdown(); // wait 10 secs till terminated try { @@ -53,6 +54,7 @@ public class GracefulThreadPoolTerminator implements Callable<String> { // interrupted, shutdown now executor.shutdownNow(); } + log.info("Successfully shut down thread pool " + threadPoolId); return threadPoolId; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index 4eb8304..1f4e5c8 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -35,8 +35,8 @@ public class StratosThreadPool { private static final Log log = LogFactory.getLog(StratosThreadPool.class); - private static Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>(); - private static Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>(); + private static volatile Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>(); + private static volatile Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>(); private static final Object executorServiceMapLock = new Object(); private static final Object scheduledServiceMapLock = new Object(); @@ -47,14 +47,14 @@ public class StratosThreadPool { * @param maxSize Thread pool size * @return ThreadPoolExecutor */ - public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int - maxSize) { + public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int maxSize) { ThreadPoolExecutor executor = executorMap.get(identifier); if (executor == null) { synchronized (executorServiceMapLock) { if (executor == null) { + int taskQueueSize = initialSize > 3 ? (int)Math.ceil(initialSize/3) : 1; executor = new ThreadPoolExecutor(initialSize, maxSize, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new StratosThreadFactory(identifier)); + new LinkedBlockingQueue<Runnable>(taskQueueSize), new StratosThreadFactory(identifier)); executorMap.put(identifier, executor); log.info(String.format("Thread pool created: [type] Executor [id] %s " + "[initial size] %d [max size] %d", identifier, initialSize, maxSize)); @@ -88,6 +88,43 @@ public class StratosThreadPool { return scheduledExecutor; } + /** + * Stops the executor with the specified id in a graceful manner + * + * @param threadPoolId thread pool id + */ + public static void shutDownThreadPoolGracefully (String threadPoolId) { + + ThreadPoolExecutor executor = executorMap.get(threadPoolId); + if (executor == null) { + log.warn("No thread pool found for id " + threadPoolId + ", unable to shut down"); + return; + } + + new GracefulThreadPoolTerminator(threadPoolId, executor).call(); + removeThreadPoolFromCache(threadPoolId); + } + + /** + * Stops the scheduled executor with the specified id in a graceful manner + * + * @param threadPoolId thread pool id + */ + public static void shutDownScheduledThreadPoolGracefully (String threadPoolId) { + + ScheduledThreadPoolExecutor scheduledExecutor = scheduledExecutorMap.get(threadPoolId); + if (scheduledExecutor == null) { + log.warn("No scheduled thread pool found for id " + threadPoolId + ", unable to shut down"); + return; + } + + new GracefulThreadPoolTerminator(threadPoolId, scheduledExecutor).call(); + removeScheduledThreadPoolFromCache(threadPoolId); + } + + /** + * Stop all executors in a graceful manner + */ public static void shutDownAllThreadPoolsGracefully () { int threadPoolCount = executorMap.size(); @@ -129,6 +166,9 @@ public class StratosThreadPool { } } + /** + * Stop all scheduled executors in a graceful manner + */ public static void shutDownAllScheduledExecutorsGracefully () { int threadPoolCount = scheduledExecutorMap.size(); @@ -170,6 +210,11 @@ public class StratosThreadPool { } } + /** + * Removes the thread pool with id terminatedPoolId from the executorMap + * + * @param terminatedPoolId thread pool id + */ private static void removeThreadPoolFromCache(String terminatedPoolId) { if (executorMap.remove(terminatedPoolId) != null) { log.info("Thread pool [id] " + terminatedPoolId + " is successfully shut down" + @@ -177,6 +222,11 @@ public class StratosThreadPool { } } + /** + * Removes the scheduled thread pool with id terminatedPoolId from the scheduledExecutorMap + * + * @param terminatedPoolId thread pool id + */ private static void removeScheduledThreadPoolFromCache(String terminatedPoolId) { if (scheduledExecutorMap.remove(terminatedPoolId) != null) { log.info("Scheduled Thread pool [id] " + terminatedPoolId + " is successfully shut down" +
