changing fixed thread pool to dynamic
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/cef4fe1b Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/cef4fe1b Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/cef4fe1b Branch: refs/heads/stratos-4.1.x Commit: cef4fe1b79b8bc9af36c05ad90137fbbe8995c3f Parents: d1fb292 Author: Isuru Haththotuwa <[email protected]> Authored: Fri Dec 4 15:59:28 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Mon Dec 7 18:49:55 2015 +0530 ---------------------------------------------------------------------- .../AutoscalerHealthStatEventReceiver.java | 18 ++++---- .../AutoscalerInitializerTopicReceiver.java | 18 ++++---- .../AutoscalerTopologyEventReceiver.java | 12 ++--- .../internal/AutoscalerServiceComponent.java | 23 +++++----- .../monitor/cluster/ClusterMonitor.java | 12 ++--- .../monitor/component/ApplicationMonitor.java | 12 ++--- .../monitor/component/GroupMonitor.java | 18 ++++---- .../component/ParentComponentMonitor.java | 9 ++-- .../publisher/DASScalingDecisionPublisher.java | 10 ++--- .../stratos/autoscaler/util/AutoscalerUtil.java | 3 +- .../autoscaler/util/ServiceReferenceHolder.java | 11 ++--- .../agent/test/JavaCartridgeAgentTest.java | 6 +-- .../context/CloudControllerContext.java | 11 ++--- .../CloudControllerServiceComponent.java | 22 +++++----- .../application/ApplicationEventReceiver.java | 8 ++-- .../status/ClusterStatusTopicReceiver.java | 12 ++--- .../initializer/InitializerTopicReceiver.java | 12 ++--- .../status/InstanceStatusTopicReceiver.java | 20 ++++----- .../impl/CloudControllerServiceImpl.java | 13 +++--- .../DASMemberInformationPublisher.java | 11 ++--- .../publisher/DASMemberStatusPublisher.java | 11 ++--- .../common/threading/StratosThreadFactory.java | 4 +- .../common/threading/StratosThreadPool.java | 26 ++++++----- .../extension/api/LoadBalancerExtension.java | 24 +++++----- .../internal/LoadBalancerServiceComponent.java | 46 ++++++++++---------- .../LoadBalancerStatisticsExecutor.java | 12 ++--- .../StratosManagerServiceComponent.java | 38 ++++++++++------ .../StratosManagerInitializerTopicReceiver.java | 10 ++--- .../message/receiver/StratosEventReceiver.java | 4 +- .../application/ApplicationsEventReceiver.java | 16 +++---- .../signup/ApplicationSignUpEventReceiver.java | 16 +++---- .../status/ClusterStatusEventReceiver.java | 14 +++--- .../mapping/DomainMappingEventReceiver.java | 15 +++---- .../health/stat/HealthStatEventReceiver.java | 14 +++--- .../initializer/InitializerEventReceiver.java | 16 +++---- .../notifier/InstanceNotifierEventReceiver.java | 6 +-- .../status/InstanceStatusEventReceiver.java | 12 ++--- .../receiver/tenant/TenantEventReceiver.java | 14 +++--- .../topology/TopologyEventReceiver.java | 16 +++---- .../MetadataApplicationEventReceiver.java | 6 +-- .../service/MetadataTopologyEventReceiver.java | 10 ++--- .../mock/iaas/services/impl/MockInstance.java | 13 +++--- .../org/apache/stratos/aws/extension/Main.java | 10 ++--- .../apache/stratos/haproxy/extension/Main.java | 8 ++-- .../org/apache/stratos/lvs/extension/Main.java | 8 ++-- .../apache/stratos/nginx/extension/Main.java | 8 ++-- .../tests/PythonAgentIntegrationTest.java | 3 +- .../integration/common/TopologyHandler.java | 19 ++++---- 48 files changed, 335 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java index 0b13500..42d7771 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -42,7 +42,7 @@ public class AutoscalerHealthStatEventReceiver { private boolean terminated = false; private HealthStatEventReceiver healthStatEventReceiver; - private ExecutorService executorService; + // private ExecutorService executorService; public AutoscalerHealthStatEventReceiver() { this.healthStatEventReceiver = HealthStatEventReceiver.getInstance(); @@ -50,7 +50,7 @@ public class AutoscalerHealthStatEventReceiver { } // public void execute() { -// healthStatEventReceiver.setExecutorService(executorService); +// healthStatEventReceiver.setExecutor(executor); // healthStatEventReceiver.execute(); // // if (log.isInfoEnabled()) { @@ -480,11 +480,11 @@ public class AutoscalerHealthStatEventReceiver { this.terminated = true; } - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public ExecutorService getExecutor() { +// return executorService; +// } +// +// public void setExecutor(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java index b330211..e1dbd7f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java @@ -31,7 +31,7 @@ import java.util.concurrent.ExecutorService; public class AutoscalerInitializerTopicReceiver { private static final Log log = LogFactory.getLog(AutoscalerInitializerTopicReceiver.class); private InitializerEventReceiver initializerEventReceiver; - private ExecutorService executorService; + //private ExecutorService executorService; public AutoscalerInitializerTopicReceiver() { this.initializerEventReceiver = InitializerEventReceiver.getInstance(); @@ -39,7 +39,7 @@ public class AutoscalerInitializerTopicReceiver { } // public void execute() { -// initializerEventReceiver.setExecutorService(executorService); +// initializerEventReceiver.setExecutor(executor); // initializerEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Cloud controller initializer topic receiver started"); @@ -62,11 +62,11 @@ public class AutoscalerInitializerTopicReceiver { }); } - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public ExecutorService getExecutor() { +// return executorService; +// } +// +// public void setExecutor(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index daa70ae..b0af42a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -57,7 +57,7 @@ public class AutoscalerTopologyEventReceiver { private TopologyEventReceiver topologyEventReceiver; private boolean terminated; private boolean topologyInitialized; - private ExecutorService executorService; + //private ExecutorService executorService; public AutoscalerTopologyEventReceiver() { this.topologyEventReceiver = TopologyEventReceiver.getInstance(); @@ -66,7 +66,7 @@ public class AutoscalerTopologyEventReceiver { // public void execute() { // //FIXME this activated before autoscaler deployer activated. -// // topologyEventReceiver.setExecutorService(getExecutorService()); +// // topologyEventReceiver.setExecutor(getExecutor()); // //topologyEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Autoscaler topology receiver thread started"); @@ -515,11 +515,11 @@ public class AutoscalerTopologyEventReceiver { // terminated = true; // } // -// public ExecutorService getExecutorService() { -// return executorService; +// public ExecutorService getExecutor() { +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutor(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java index bb28577..8219f71 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java @@ -59,6 +59,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -85,7 +86,7 @@ public class AutoscalerServiceComponent { private AutoscalerTopologyEventReceiver asTopologyReceiver; private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver; private AutoscalerInitializerTopicReceiver autoscalerInitializerTopicReceiver; - private ExecutorService executorService; + private ThreadPoolExecutor executor; private ScheduledExecutorService scheduler; protected void activate(ComponentContext componentContext) throws Exception { @@ -96,8 +97,8 @@ public class AutoscalerServiceComponent { XMLConfiguration conf = ConfUtil.getInstance(AutoscalerConstants.COMPONENTS_CONFIG).getConfiguration(); int threadPoolSize = conf .getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY, AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE); - executorService = StratosThreadPool - .getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, threadPoolSize); + executor = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, ((int) Math + .ceil(threadPoolSize / 3)), threadPoolSize); int schedulerThreadPoolSize = conf.getInt(AutoscalerConstants.SCHEDULER_THREAD_POOL_SIZE_KEY, AutoscalerConstants.AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE); @@ -114,7 +115,7 @@ public class AutoscalerServiceComponent { componentStartUpSynchronizer .waitForComponentActivation(Component.Autoscaler, Component.CloudController); - ServiceReferenceHolder.getInstance().setExecutorService(executorService); + ServiceReferenceHolder.getInstance().setExecutor(executor); CartridgeConfigFileReader.readProperties(); if (AutoscalerContext.getInstance().isClustered()) { Thread coordinatorElectorThread = new Thread() { @@ -136,7 +137,7 @@ public class AutoscalerServiceComponent { } }; coordinatorElectorThread.setName("Autoscaler coordinator elector thread"); - executorService.submit(coordinatorElectorThread); + executor.submit(coordinatorElectorThread); } else { executeCoordinatorTasks(); } @@ -173,7 +174,7 @@ public class AutoscalerServiceComponent { // Start topology receiver asTopologyReceiver = new AutoscalerTopologyEventReceiver(); -// asTopologyReceiver.setExecutorService(executorService); +// asTopologyReceiver.setExecutor(executor); //asTopologyReceiver.execute(); if (log.isDebugEnabled()) { log.debug("Topology receiver executor service started"); @@ -181,7 +182,7 @@ public class AutoscalerServiceComponent { // Start health stat receiver autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver(); -// autoscalerHealthStatEventReceiver.setExecutorService(executorService); +// autoscalerHealthStatEventReceiver.setExecutor(executor); // autoscalerHealthStatEventReceiver.execute(); if (log.isDebugEnabled()) { log.debug("Health statistics receiver thread started"); @@ -189,7 +190,7 @@ public class AutoscalerServiceComponent { // Start initializer receiver autoscalerInitializerTopicReceiver = new AutoscalerInitializerTopicReceiver(); -// autoscalerInitializerTopicReceiver.setExecutorService(executorService); +// autoscalerInitializerTopicReceiver.setExecutor(executor); // autoscalerInitializerTopicReceiver.execute(); // if (log.isDebugEnabled()) { // log.debug("Initializer receiver thread started"); @@ -275,9 +276,9 @@ public class AutoscalerServiceComponent { } private void shutdownExecutorService(String executorServiceId) { - ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1); - if (executorService != null) { - shutdownExecutorService(executorService); + ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); + if (executor != null) { + shutdownExecutorService(executor); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index 5976279..32bf037 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -87,7 +87,7 @@ public class ClusterMonitor extends Monitor { private static final Log log = LogFactory.getLog(ClusterMonitor.class); private final ScheduledExecutorService scheduler; - private final ExecutorService executorService; + private final ThreadPoolExecutor executor; protected boolean hasFaultyMember = false; protected ClusterContext clusterContext; protected String serviceType; @@ -109,8 +109,8 @@ public class ClusterMonitor extends Monitor { scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID, 50); int threadPoolSize = Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100); - executorService = StratosThreadPool.getExecutorService( - AutoscalerConstants.MONITOR_THREAD_POOL_ID, threadPoolSize); + executor = StratosThreadPool.getExecutorService( + AutoscalerConstants.MONITOR_THREAD_POOL_ID, ((int)Math.ceil(threadPoolSize/3)), threadPoolSize); this.clusterId = cluster.getClusterId(); readConfigurations(); this.groupScalingEnabledSubtree = groupScalingEnabledSubtree; @@ -407,7 +407,7 @@ public class ClusterMonitor extends Monitor { } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } if (instance.getStatus() == ClusterStatus.Terminating) { @@ -453,7 +453,7 @@ public class ClusterMonitor extends Monitor { } } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } } } @@ -523,7 +523,7 @@ public class ClusterMonitor extends Monitor { } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java index e15b795..c94ef7e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java @@ -55,7 +55,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * ApplicationMonitor is to control the child monitors @@ -64,7 +64,7 @@ public class ApplicationMonitor extends ParentComponentMonitor { private static final Log log = LogFactory.getLog(ApplicationMonitor.class); - private final ExecutorService executorService; + private final ThreadPoolExecutor executor; //Flag to set whether application is terminating private boolean isTerminating; @@ -79,8 +79,8 @@ public class ApplicationMonitor extends ParentComponentMonitor { super(application); int threadPoolSize = Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100); - this.executorService = StratosThreadPool.getExecutorService( - AutoscalerConstants.MONITOR_THREAD_POOL_ID, threadPoolSize); + this.executor = StratosThreadPool.getExecutorService(AutoscalerConstants.MONITOR_THREAD_POOL_ID, + ((int)Math.ceil(threadPoolSize/3)), threadPoolSize); //setting the appId for the application this.appId = application.getUniqueIdentifier(); @@ -164,7 +164,7 @@ public class ApplicationMonitor extends ParentComponentMonitor { } } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } private void handleScalingMaxOut(ParentInstanceContext instanceContext, @@ -476,7 +476,7 @@ public class ApplicationMonitor extends ParentComponentMonitor { } } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java index 341f264..d5cfc3f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java @@ -56,7 +56,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * This is GroupMonitor to monitor the group which consists of @@ -66,7 +66,7 @@ public class GroupMonitor extends ParentComponentMonitor { private static final Log log = LogFactory.getLog(GroupMonitor.class); - private final ExecutorService executorService; + private final ThreadPoolExecutor executor; //has scaling dependents protected boolean hasScalingDependents; //Indicates whether groupScaling enabled or not @@ -85,8 +85,8 @@ public class GroupMonitor extends ParentComponentMonitor { super(group); int threadPoolSize = Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100); - this.executorService = StratosThreadPool.getExecutorService( - AutoscalerConstants.MONITOR_THREAD_POOL_ID, threadPoolSize); + this.executor = StratosThreadPool.getExecutorService( + AutoscalerConstants.MONITOR_THREAD_POOL_ID, ((int)Math.ceil(threadPoolSize/3)),threadPoolSize); this.groupScalingEnabled = group.isGroupScalingEnabled(); this.appId = appId; @@ -225,7 +225,7 @@ public class GroupMonitor extends ParentComponentMonitor { } } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } /** @@ -336,7 +336,7 @@ public class GroupMonitor extends ParentComponentMonitor { appId); } }; - executorService.execute(sendScaleMaxOut); + executor.execute(sendScaleMaxOut); } } else { if (log.isDebugEnabled()) { @@ -356,7 +356,7 @@ public class GroupMonitor extends ParentComponentMonitor { appId); } }; - executorService.execute(sendScaleMaxOut); + executor.execute(sendScaleMaxOut); } } @@ -488,7 +488,7 @@ public class GroupMonitor extends ParentComponentMonitor { } } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } @@ -594,7 +594,7 @@ public class GroupMonitor extends ParentComponentMonitor { } } }; - executorService.execute(monitoringRunnable); + executor.execute(monitoringRunnable); } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java index f05827a..1366a3f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java @@ -87,7 +87,7 @@ public abstract class ParentComponentMonitor extends Monitor { // future to cancel it when destroying monitors private ScheduledFuture<?> schedulerFuture; //Executor service to maintain the thread pool - private ExecutorService executorService; + private ThreadPoolExecutor executor; public ParentComponentMonitor(ParentComponent component) throws DependencyBuilderException { aliasToActiveChildMonitorsMap = new ConcurrentHashMap<String, Monitor>(); @@ -109,8 +109,9 @@ public abstract class ParentComponentMonitor extends Monitor { } // Create the executor service with identifier and thread pool size - executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, - AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE); + executor = StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, + ((int)Math.ceil(AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE/3)),AutoscalerConstants + .AUTOSCALER_THREAD_POOL_SIZE); networkPartitionContextsMap = new ConcurrentHashMap<String, NetworkPartitionContext>(); } @@ -864,7 +865,7 @@ public abstract class ParentComponentMonitor extends Monitor { ApplicationChildContext context, List<String> parentInstanceIds) { if (!this.aliasToActiveChildMonitorsMap.containsKey(context.getId())) { pendingChildMonitorsList.add(context.getId()); - executorService.submit(new MonitorAdder(parent, context, this.appId, parentInstanceIds)); + executor.submit(new MonitorAdder(parent, context, this.appId, parentInstanceIds)); String monitorTypeStr = AutoscalerUtil.findMonitorType(context).toString().toLowerCase(); log.info(String.format("Monitor scheduled: [type] %s [component] %s ", http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java index 52857d4..ad0e77b 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java @@ -29,7 +29,7 @@ import org.wso2.carbon.databridge.commons.StreamDefinition; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * MemberInfoPublisher to publish member information/metadata to DAS. @@ -42,12 +42,12 @@ public class DASScalingDecisionPublisher extends ScalingDecisionPublisher { private static final String VERSION = "1.0.0"; private static final String DAS_THRIFT_CLIENT_NAME = "das"; private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10; - private ExecutorService executorService; + private ThreadPoolExecutor executor; public DASScalingDecisionPublisher() { super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); - executorService = StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID, - STATS_PUBLISHER_THREAD_POOL_SIZE); + executor = StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID, + ((int)Math.ceil(STATS_PUBLISHER_THREAD_POOL_SIZE/3)), STATS_PUBLISHER_THREAD_POOL_SIZE); } public static DASScalingDecisionPublisher getInstance() { @@ -168,7 +168,7 @@ public class DASScalingDecisionPublisher extends ScalingDecisionPublisher { } }; - executorService.execute(publisher); + executor.execute(publisher); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java index a639673..b6ce0ed 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java @@ -51,7 +51,6 @@ import org.apache.stratos.autoscaler.pojo.policy.PolicyManager; import org.apache.stratos.autoscaler.pojo.policy.deployment.ApplicationPolicy; import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy; import org.apache.stratos.autoscaler.registry.RegistryManager; -import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.Properties; import org.apache.stratos.common.Property; import org.apache.stratos.common.client.CloudControllerServiceClient; @@ -843,7 +842,7 @@ public class AutoscalerUtil { AutoscalerContext autoscalerContext = AutoscalerContext.getInstance(); if (autoscalerContext.getAppMonitor(applicationId) == null) { autoscalerContext.addApplicationPendingMonitor(applicationId); - ServiceReferenceHolder.getInstance().getExecutorService().submit(new ApplicationMonitorAdder(applicationId)); + ServiceReferenceHolder.getInstance().getExecutor().submit(new ApplicationMonitorAdder(applicationId)); log.info(String.format("Monitor scheduled: [application] %s ", applicationId)); } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java index 4cc175c..7e10a3c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java @@ -32,6 +32,7 @@ import org.wso2.carbon.registry.core.Registry; import org.wso2.carbon.registry.core.session.UserRegistry; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; public class ServiceReferenceHolder { @@ -43,7 +44,7 @@ public class ServiceReferenceHolder { private AxisConfiguration axisConfiguration; private DistributedObjectProvider distributedObjectProvider; private HazelcastInstance hazelcastInstance; - private ExecutorService executorService; + private ThreadPoolExecutor executor; private ComponentStartUpSynchronizer componentStartUpSynchronizer; private ServiceReferenceHolder() { @@ -116,12 +117,12 @@ public class ServiceReferenceHolder { this.groupStatusProcessorChain = groupStatusProcessorChain; } - public ExecutorService getExecutorService() { - return executorService; + public ThreadPoolExecutor getExecutor() { + return executor; } - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; + public void setExecutor(ThreadPoolExecutor executor) { + this.executor = executor; } public ComponentStartUpSynchronizer getComponentStartUpSynchronizer() { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java index 903fa58..751e7c8 100644 --- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java +++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java @@ -121,13 +121,13 @@ public class JavaCartridgeAgentTest { String agentHome = setupJavaAgent(); - ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5); +// ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5); topologyEventReceiver = TopologyEventReceiver.getInstance(); - //topologyEventReceiver.setExecutorService(executorService); + //topologyEventReceiver.setExecutorService(executor); //topologyEventReceiver.execute(); instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance(); -// instanceStatusEventReceiver.setExecutorService(executorService); +// instanceStatusEventReceiver.setExecutorService(executor); // instanceStatusEventReceiver.execute(); instanceStarted = false; http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java index 4d28dd5..0771d5a 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java @@ -44,6 +44,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Lock; /** @@ -135,8 +136,8 @@ public class CloudControllerContext implements Serializable { /** * Thread pool used in this task to execute parallel tasks. */ - private transient ExecutorService executorService = StratosThreadPool - .getExecutorService("cloud.controller.context.thread.pool", 10); + private transient ThreadPoolExecutor executor = StratosThreadPool + .getExecutorService("cloud.controller.context.thread.pool", 5, 10); /** * Map of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s @@ -495,9 +496,9 @@ public class CloudControllerContext implements Serializable { return removed; } - public ExecutorService getExecutorService() { - return executorService; - } +// public ExecutorService getExecutor() { +// return executor; +// } public List<String> getPartitionIds(String cartridgeType) { return cartridgeTypeToPartitionIdsMap.get(cartridgeType); http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index 267d5a8..710e400 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -49,6 +49,7 @@ import org.wso2.carbon.utils.ConfigurationContextService; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -82,7 +83,7 @@ public class CloudControllerServiceComponent { private InstanceStatusTopicReceiver instanceStatusTopicReceiver; private ApplicationEventReceiver applicationEventReceiver; private InitializerTopicReceiver initializerTopicReceiver; - private ExecutorService executorService; + private ThreadPoolExecutor executor; private ScheduledExecutorService scheduler; protected void activate(final ComponentContext context) { @@ -90,7 +91,8 @@ public class CloudControllerServiceComponent { log.debug("Activating CloudControllerServiceComponent..."); } try { - executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE); + executor = StratosThreadPool.getExecutorService(THREAD_POOL_ID, ((int)Math.ceil + (THREAD_POOL_SIZE/3)),THREAD_POOL_SIZE); scheduler = StratosThreadPool .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE); @@ -123,7 +125,7 @@ public class CloudControllerServiceComponent { } }; coordinatorElectorThread.setName("Cloud controller coordinator elector thread"); - executorService.submit(coordinatorElectorThread); + executor.submit(coordinatorElectorThread); } else { executeCoordinatorTasks(); } @@ -146,7 +148,7 @@ public class CloudControllerServiceComponent { private void executeCoordinatorTasks() { applicationEventReceiver = new ApplicationEventReceiver(); -// applicationEventReceiver.setExecutorService(executorService); +// applicationEventReceiver.setExecutorService(executor); // applicationEventReceiver.execute(); if (log.isInfoEnabled()) { @@ -154,7 +156,7 @@ public class CloudControllerServiceComponent { } clusterStatusTopicReceiver = new ClusterStatusTopicReceiver(); -// clusterStatusTopicReceiver.setExecutorService(executorService); +// clusterStatusTopicReceiver.setExecutorService(executor); // clusterStatusTopicReceiver.execute(); if (log.isInfoEnabled()) { @@ -162,7 +164,7 @@ public class CloudControllerServiceComponent { } instanceStatusTopicReceiver = new InstanceStatusTopicReceiver(); -// instanceStatusTopicReceiver.setExecutorService(executorService); +// instanceStatusTopicReceiver.setExecutorService(executor); // instanceStatusTopicReceiver.execute(); if (log.isInfoEnabled()) { @@ -170,7 +172,7 @@ public class CloudControllerServiceComponent { } initializerTopicReceiver = new InitializerTopicReceiver(); -// initializerTopicReceiver.setExecutorService(executorService); +// initializerTopicReceiver.setExecutorService(executor); // initializerTopicReceiver.execute(); if (log.isInfoEnabled()) { @@ -269,9 +271,9 @@ public class CloudControllerServiceComponent { } private void shutdownExecutorService(String executorServiceId) { - ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1); - if (executorService != null) { - shutdownExecutorService(executorService); + ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); + if (executor != null) { + shutdownExecutorService(executor); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java index 8da5575..26290b7 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java @@ -37,7 +37,7 @@ import java.util.concurrent.ExecutorService; public class ApplicationEventReceiver { private static final Log log = LogFactory.getLog(ApplicationEventReceiver.class); private ApplicationsEventReceiver applicationsEventReceiver; - // private ExecutorService executorService; + // private ExecutorService executor; public ApplicationEventReceiver() { this.applicationsEventReceiver = ApplicationsEventReceiver.getInstance(); @@ -48,7 +48,7 @@ public class ApplicationEventReceiver { // if (log.isInfoEnabled()) { // log.info("Cloud controller application event receiver thread started"); // } -// applicationsEventReceiver.setExecutorService(executorService); +// applicationsEventReceiver.setExecutorService(executor); // applicationsEventReceiver.execute(); // } @@ -76,7 +76,7 @@ public class ApplicationEventReceiver { }); } -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java index e0b9f62..a7c0947 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java @@ -34,7 +34,7 @@ public class ClusterStatusTopicReceiver { private ClusterStatusEventReceiver clusterStatusEventReceiver; //private boolean terminated; - //private ExecutorService executorService; + //private ExecutorService executor; public ClusterStatusTopicReceiver() { this.clusterStatusEventReceiver = ClusterStatusEventReceiver.getInstance(); @@ -42,7 +42,7 @@ public class ClusterStatusTopicReceiver { } // public void execute() { -// clusterStatusEventReceiver.setExecutorService(executorService); +// clusterStatusEventReceiver.setExecutorService(executor); // clusterStatusEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Cloud controller Cluster status thread started"); @@ -119,11 +119,11 @@ public class ClusterStatusTopicReceiver { // this.terminated = terminated; // } // -// public ExecutorService getExecutorService() { -// return executorService; +// public ExecutorService getExecutor() { +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java index 9a2c502..4d31058 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java @@ -31,7 +31,7 @@ import java.util.concurrent.ExecutorService; public class InitializerTopicReceiver { private static final Log log = LogFactory.getLog(InitializerTopicReceiver.class); private InitializerEventReceiver initializerEventReceiver; - private ExecutorService executorService; + //private ExecutorService executorService; public InitializerTopicReceiver() { this.initializerEventReceiver = InitializerEventReceiver.getInstance(); @@ -39,7 +39,7 @@ public class InitializerTopicReceiver { } // public void execute() { -// initializerEventReceiver.setExecutorService(executorService); +// initializerEventReceiver.setExecutorService(executor); // initializerEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Autoscaler initializer topic receiver started"); @@ -62,11 +62,11 @@ public class InitializerTopicReceiver { }); } -// public ExecutorService getExecutorService() { -// return executorService; +// public ExecutorService getExecutor() { +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java index bfa205b..a3bbc2b 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java @@ -42,8 +42,8 @@ public class InstanceStatusTopicReceiver { private static final Log log = LogFactory.getLog(InstanceStatusTopicReceiver.class); private InstanceStatusEventReceiver statusEventReceiver; - private boolean terminated; - private ExecutorService executorService; + //private boolean terminated; + //private ExecutorService executorService; public InstanceStatusTopicReceiver() { this.statusEventReceiver = InstanceStatusEventReceiver.getInstance(); @@ -51,7 +51,7 @@ public class InstanceStatusTopicReceiver { } // public void execute() { -// statusEventReceiver.setExecutorService(executorService); +// statusEventReceiver.setExecutorService(executor); // statusEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Cloud controller application status thread started"); @@ -132,11 +132,11 @@ public class InstanceStatusTopicReceiver { } - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public ExecutorService getExecutorService() { +// return executorService; +// } +// +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index d25ab52..8bd9ec2 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -43,7 +43,7 @@ import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Lock; /** @@ -60,10 +60,11 @@ public class CloudControllerServiceImpl implements CloudControllerService { public static final String KUBERNETES_CLUSTER = "cluster"; private CloudControllerContext cloudControllerContext = CloudControllerContext.getInstance(); - private ExecutorService executorService; + private ThreadPoolExecutor executor; public CloudControllerServiceImpl() { - executorService = StratosThreadPool.getExecutorService("cloud.controller.instance.manager.thread.pool", 50); + executor = StratosThreadPool.getExecutorService("cloud.controller.instance.manager.thread" + + ".pool", 20, 50); } @@ -496,7 +497,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { + "[member] %s [application-id] %s", instanceContext.getClusterId(), instanceContext.getClusterInstanceId(), memberId, applicationId)); } - executorService.execute(new InstanceCreator(memberContext, iaasProvider, payload.toString().getBytes())); + executor.execute(new InstanceCreator(memberContext, iaasProvider, payload.toString().getBytes())); return memberContext; } catch (Exception e) { @@ -675,7 +676,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } } } - executorService.execute(new InstanceTerminator(memberContext)); + executor.execute(new InstanceTerminator(memberContext)); } finally { TopologyHolder.releaseWriteLock(); } @@ -726,7 +727,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } for (MemberContext memberContext : memberContexts) { - executorService.execute(new InstanceTerminator(memberContext)); + executor.execute(new InstanceTerminator(memberContext)); } return true; } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java index 8107e1b..0d38967 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java @@ -35,7 +35,7 @@ import org.wso2.carbon.databridge.commons.StreamDefinition; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * MemberInfoPublisher to publish member information/metadata to DAS. @@ -48,12 +48,13 @@ public class DASMemberInformationPublisher extends MemberInformationPublisher { private static final String VERSION = "1.0.0"; private static final String DAS_THRIFT_CLIENT_NAME = "das"; private static final String VALUE_NOT_FOUND = "Value Not Found"; - private ExecutorService executorService; + private ThreadPoolExecutor executor; private DASMemberInformationPublisher() { super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); - executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, - CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE); + executor = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, + ((int)Math.ceil(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE/3)), CloudControllerConstants + .STATS_PUBLISHER_THREAD_POOL_SIZE); } public static DASMemberInformationPublisher getInstance() { @@ -161,7 +162,7 @@ public class DASMemberInformationPublisher extends MemberInformationPublisher { } } }; - executorService.execute(publisher); + executor.execute(publisher); } public static String handleNull(String param) { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java index 6bb6251..f28d2c8 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java @@ -29,7 +29,7 @@ import org.wso2.carbon.databridge.commons.StreamDefinition; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * Publishing member status to DAS. @@ -41,12 +41,13 @@ public class DASMemberStatusPublisher extends MemberStatusPublisher { private static final String DATA_STREAM_NAME = "member_lifecycle"; private static final String VERSION = "1.0.0"; private static final String DAS_THRIFT_CLIENT_NAME = "das"; - private ExecutorService executorService; + private ThreadPoolExecutor executor; private DASMemberStatusPublisher() { super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME); - executorService = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, - CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE); + executor = StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID, + ((int)Math.ceil(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE/3)), CloudControllerConstants + .STATS_PUBLISHER_THREAD_POOL_SIZE); } public static DASMemberStatusPublisher getInstance() { @@ -133,7 +134,7 @@ public class DASMemberStatusPublisher extends MemberStatusPublisher { publish(payload.toArray()); } }; - executorService.execute(publisher); + executor.execute(publisher); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java index b6abbf3..98ddd37 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java @@ -19,7 +19,9 @@ package org.apache.stratos.common.threading; -public class StratosThreadFactory { +import java.util.concurrent.ThreadFactory; + +public class StratosThreadFactory implements ThreadFactory { private String prefix; private int counter; http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index d4531e2..459cd1d 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -33,7 +33,7 @@ public class StratosThreadPool { private static final Log log = LogFactory.getLog(StratosThreadPool.class); - private static Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<String, ExecutorService>(); + private static Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>(); private static Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>(); private static Object executorServiceMapLock = new Object(); private static Object scheduledServiceMapLock = new Object(); @@ -42,21 +42,24 @@ public class StratosThreadPool { * Return the executor service based on the identifier and thread pool size * * @param identifier Thread pool identifier name - * @param threadPoolSize Thread pool size + * @param maxSize Thread pool size * @return ExecutorService */ - public static ExecutorService getExecutorService(String identifier, int threadPoolSize) { - ExecutorService executorService = executorServiceMap.get(identifier); - if (executorService == null) { + public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int + maxSize) { + ThreadPoolExecutor executor = executorMap.get(identifier); + if (executor == null) { synchronized (executorServiceMapLock) { - if (executorService == null) { - executorService = Executors.newFixedThreadPool(threadPoolSize); - executorServiceMap.put(identifier, executorService); - log.info(String.format("Thread pool created: [type] Executor Service [id] %s [size] %d", identifier, threadPoolSize)); + if (executor == null) { + executor = new ThreadPoolExecutor(initialSize, maxSize, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new StratosThreadFactory(identifier)); + executorMap.put(identifier, executor); + log.info(String.format("Thread pool created: [type] Executor [id] %s " + + "[initial size] %d [max size] %d", identifier, initialSize, maxSize)); } } } - return executorService; + return executor; } /** @@ -71,7 +74,8 @@ public class StratosThreadPool { if (scheduledExecutorService == null) { synchronized (scheduledServiceMapLock) { if (scheduledExecutorService == null) { - scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize); + scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize, + new StratosThreadFactory(identifier)); scheduledServiceMap.put(identifier, scheduledExecutorService); log.info(String.format("Thread pool created: [type] Scheduled Executor Service [id] %s [size] %d", identifier, threadPoolSize)); http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index ec1ddbc..838f0fc 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -89,10 +89,10 @@ public class LoadBalancerExtension { } // Start topology receiver thread - startTopologyEventReceiver(executorService, topologyProvider); - startApplicationEventReceiver(executorService); - startApplicationSignUpEventReceiver(executorService, topologyProvider); - startDomainMappingEventReceiver(executorService, topologyProvider); + startTopologyEventReceiver(topologyProvider); + startApplicationEventReceiver(); + startApplicationSignUpEventReceiver(topologyProvider); + startDomainMappingEventReceiver(topologyProvider); if (statsReader != null) { // Start stats notifier thread @@ -115,17 +115,16 @@ public class LoadBalancerExtension { /** * Start topology event receiver thread. * - * @param executorService executor service instance * @param topologyProvider topology provider instance */ - private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startTopologyEventReceiver(TopologyProvider topologyProvider) { // Enforcing the listener order in order execute extension listener later topologyEventReceiver = new LoadBalancerCommonTopologyEventReceiver(topologyProvider, false); // Add load-balancer extension event listener addTopologyEventListeners(topologyEventReceiver); // Add default topology provider event listeners topologyEventReceiver.addEventListeners(); -// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.setExecutorService(executor); // topologyEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Topology receiver thread started"); @@ -149,9 +148,9 @@ public class LoadBalancerExtension { } } - private void startApplicationEventReceiver(ExecutorService executorService) { + private void startApplicationEventReceiver() { applicationsEventReceiver = ApplicationsEventReceiver.getInstance(); -// applicationsEventReceiver.setExecutorService(executorService); +// applicationsEventReceiver.setExecutorService(executor); // applicationsEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Application event receiver thread started"); @@ -164,14 +163,14 @@ public class LoadBalancerExtension { * @param executorService executor service instance * @param topologyProvider topology receiver instance */ - private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startDomainMappingEventReceiver(TopologyProvider topologyProvider) { // Enforcing the listener order in order execute extension listener later domainMappingEventReceiver = new LoadBalancerCommonDomainMappingEventReceiver(topologyProvider, false); // Add extension event listeners addDomainMappingsEventListeners(domainMappingEventReceiver); // Add default domain mapping event listeners domainMappingEventReceiver.addEventListeners(); -// domainMappingEventReceiver.setExecutorService(executorService); +// domainMappingEventReceiver.setExecutorService(executor); // domainMappingEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Domain mapping event receiver thread started"); @@ -198,10 +197,9 @@ public class LoadBalancerExtension { /** * Start application signup event receiver thread. * - * @param executorService executor service instance * @param topologyProvider topology provider instance */ - private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startApplicationSignUpEventReceiver(TopologyProvider topologyProvider) { applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider); if (log.isInfoEnabled()) { log.info("Application signup event receiver thread started"); http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index 3786af8..cb2297a 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -64,6 +64,7 @@ import org.wso2.carbon.utils.multitenancy.MultitenantConstants; import java.io.File; import java.util.Collection; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true" @@ -90,7 +91,7 @@ public class LoadBalancerServiceComponent { private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class); private boolean activated = false; - private ExecutorService executorService; + //private ThreadPoolExecutor executor; private LoadBalancerTopologyEventReceiver topologyEventReceiver; private TenantEventReceiver tenantEventReceiver; private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver; @@ -126,8 +127,9 @@ public class LoadBalancerServiceComponent { int threadPoolSize = Integer.getInteger(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_SIZE_KEY, LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE); - executorService = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID, - threadPoolSize); + // executor = StratosThreadPool.getExecutorService(LoadBalancerConstants + // .LOAD_BALANCER_THREAD_POOL_ID, + // ((int)Math.ceil(threadPoolSize/3)), threadPoolSize); TopologyProvider topologyProvider = LoadBalancerConfiguration.getInstance().getTopologyProvider(); if (topologyProvider == null) { @@ -137,18 +139,18 @@ public class LoadBalancerServiceComponent { if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) { // Start tenant & application signup event receivers - startTenantEventReceiver(executorService); - startApplicationSignUpEventReceiver(executorService, topologyProvider); + startTenantEventReceiver(); + startApplicationSignUpEventReceiver(topologyProvider); } if (configuration.isDomainMappingEnabled()) { // Start domain mapping event receiver - startDomainMappingEventReceiver(executorService, topologyProvider); + startDomainMappingEventReceiver(topologyProvider); } if (configuration.isTopologyEventListenerEnabled()) { // Start topology receiver - startTopologyEventReceiver(executorService, topologyProvider); + startTopologyEventReceiver(topologyProvider); } if (configuration.isCepStatsPublisherEnabled()) { @@ -167,39 +169,39 @@ public class LoadBalancerServiceComponent { } } - private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startDomainMappingEventReceiver( TopologyProvider topologyProvider) { if (domainMappingEventReceiver != null) { return; } domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider); -// domainMappingEventReceiver.setExecutorService(executorService); +// domainMappingEventReceiver.setExecutorService(executor); // domainMappingEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Domain mapping event receiver thread started"); // } } - private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startApplicationSignUpEventReceiver(TopologyProvider topologyProvider) { if (applicationSignUpEventReceiver != null) { return; } applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider); -// applicationSignUpEventReceiver.setExecutorService(executorService); +// applicationSignUpEventReceiver.setExecutorService(executor); // applicationSignUpEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Application signup event receiver thread started"); } } - private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startTopologyEventReceiver(TopologyProvider topologyProvider) { if (topologyEventReceiver != null) { return; } topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider); -// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.setExecutorService(executor); // topologyEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Topology receiver thread started"); @@ -223,10 +225,10 @@ public class LoadBalancerServiceComponent { } } - private void startTenantEventReceiver(ExecutorService executorService) { + private void startTenantEventReceiver() { tenantEventReceiver = TenantEventReceiver.getInstance(); -// tenantEventReceiver.setExecutorService(executorService); +// tenantEventReceiver.setExecutorService(executor); // tenantEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Tenant event receiver thread started"); @@ -293,13 +295,13 @@ public class LoadBalancerServiceComponent { } // Shutdown executor service - if (executorService != null) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down load balancer executor service", e); - } - } +// if (executor != null) { +// try { +// executor.shutdownNow(); +// } catch (Exception e) { +// log.warn("An error occurred while shutting down load balancer executor service", e); +// } +// } } /** http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java index 04a0756..e625ec7 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java @@ -24,6 +24,7 @@ import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.util.LoadBalancerConstants; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * An executor service to asynchronously execute statistics update calls without blocking the @@ -35,11 +36,12 @@ public class LoadBalancerStatisticsExecutor { private static volatile LoadBalancerStatisticsExecutor instance; - private ExecutorService executorService; + private ThreadPoolExecutor executor; private LoadBalancerStatisticsExecutor() { - executorService = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID, - LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE); + executor = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID, + ((int)Math.ceil(LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE/3)), LoadBalancerConstants + .LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE); } public static LoadBalancerStatisticsExecutor getInstance() { @@ -53,7 +55,7 @@ public class LoadBalancerStatisticsExecutor { return instance; } - public ExecutorService getService() { - return executorService; + public ThreadPoolExecutor getService() { + return executor; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java index c4d68ae..0486d84 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java @@ -50,6 +50,7 @@ import org.wso2.carbon.utils.ConfigurationContextService; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -92,7 +93,7 @@ public class StratosManagerServiceComponent { private StratosManagerInstanceStatusEventReceiver instanceStatusEventReceiver; private StratosManagerApplicationEventReceiver applicationEventReceiver; private StratosManagerInitializerTopicReceiver initializerTopicReceiver; - private ExecutorService executorService; + private ThreadPoolExecutor executor; private ScheduledExecutorService scheduler; protected void activate(final ComponentContext componentContext) throws Exception { @@ -100,7 +101,8 @@ public class StratosManagerServiceComponent { log.debug("Activating StratosManagerServiceComponent..."); } try { - executorService = StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE); + executor = StratosThreadPool.getExecutorService(THREAD_POOL_ID, ((int)Math.ceil(THREAD_POOL_SIZE/3)) + , THREAD_POOL_SIZE); scheduler = StratosThreadPool .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, SCHEDULER_THREAD_POOL_SIZE); @@ -141,7 +143,7 @@ public class StratosManagerServiceComponent { } }; coordinatorElectorThread.setName("Stratos manager coordinator elector thread"); - executorService.submit(coordinatorElectorThread); + executor.submit(coordinatorElectorThread); } else { executeCoordinatorTasks(componentContext); } @@ -193,7 +195,7 @@ public class StratosManagerServiceComponent { private void initializeInitializerEventReceiver() { initializerTopicReceiver = new StratosManagerInitializerTopicReceiver(); -// initializerTopicReceiver.setExecutorService(executorService); +// initializerTopicReceiver.setExecutorService(executor); // initializerTopicReceiver.execute(); } @@ -202,7 +204,7 @@ public class StratosManagerServiceComponent { */ private void initializeInstanceStatusEventReceiver() { instanceStatusEventReceiver = new StratosManagerInstanceStatusEventReceiver(); -// instanceStatusEventReceiver.setExecutorService(executorService); +// instanceStatusEventReceiver.setExecutorService(executor); // instanceStatusEventReceiver.execute(); } @@ -211,7 +213,7 @@ public class StratosManagerServiceComponent { */ private void initializeTopologyEventReceiver() { topologyEventReceiver = new StratosManagerTopologyEventReceiver(); -// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.setExecutorService(executor); // topologyEventReceiver.execute(); } @@ -220,7 +222,7 @@ public class StratosManagerServiceComponent { */ private void initializeApplicationEventReceiver() { applicationEventReceiver = new StratosManagerApplicationEventReceiver(); -// applicationEventReceiver.setExecutorService(executorService); +// applicationEventReceiver.setExecutorService(executor); // applicationEventReceiver.execute(); } @@ -337,25 +339,33 @@ public class StratosManagerServiceComponent { EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName()); EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName()); - shutdownExecutorService(THREAD_POOL_ID); + shutdownExecutor(THREAD_POOL_ID); shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID); } - private void shutdownExecutorService(String executorServiceId) { - ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1); - if (executorService != null) { - shutdownExecutorService(executorService); + private void shutdownExecutor(String executorServiceId) { + ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); + if (executor != null) { + shutdownExecutor(executor); } } private void shutdownScheduledExecutorService(String executorServiceId) { ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1); if (executorService != null) { - shutdownExecutorService(executorService); + shutdownExecutor(executorService); + } + } + + private void shutdownExecutor(ThreadPoolExecutor executor) { + try { + executor.shutdownNow(); + } catch (Exception e) { + log.warn("An error occurred while shutting down executor service", e); } } - private void shutdownExecutorService(ExecutorService executorService) { + private void shutdownExecutor(ExecutorService executorService) { try { executorService.shutdownNow(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java index c08e8e4..5f7bc53 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java @@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService; public class StratosManagerInitializerTopicReceiver { private static final Log log = LogFactory.getLog(StratosManagerInitializerTopicReceiver.class); private InitializerEventReceiver initializerEventReceiver; - //private ExecutorService executorService; + //private ExecutorService executor; private ApplicationSignUpHandler applicationSignUpHandler; public StratosManagerInitializerTopicReceiver() { @@ -43,7 +43,7 @@ public class StratosManagerInitializerTopicReceiver { } // public void execute() { -// initializerEventReceiver.setExecutorService(executorService); +// initializerEventReceiver.setExecutorService(executor); // initializerEventReceiver.execute(); // if (log.isInfoEnabled()) { // log.info("Stratos manager initializer topic receiver started"); @@ -82,10 +82,10 @@ public class StratosManagerInitializerTopicReceiver { } // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java index 5ac89e6..0b07940 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java @@ -19,11 +19,11 @@ package org.apache.stratos.messaging.message.receiver; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; public class StratosEventReceiver { - protected ExecutorService executorService; + protected ThreadPoolExecutor executor; public StratosEventReceiver () { }
