changing fixed thread pool to dynamic

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/cef4fe1b
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/cef4fe1b
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/cef4fe1b

Branch: refs/heads/stratos-4.1.x
Commit: cef4fe1b79b8bc9af36c05ad90137fbbe8995c3f
Parents: d1fb292
Author: Isuru Haththotuwa <[email protected]>
Authored: Fri Dec 4 15:59:28 2015 +0530
Committer: Isuru Haththotuwa <[email protected]>
Committed: Mon Dec 7 18:49:55 2015 +0530

----------------------------------------------------------------------
 .../AutoscalerHealthStatEventReceiver.java      | 18 ++++----
 .../AutoscalerInitializerTopicReceiver.java     | 18 ++++----
 .../AutoscalerTopologyEventReceiver.java        | 12 ++---
 .../internal/AutoscalerServiceComponent.java    | 23 +++++-----
 .../monitor/cluster/ClusterMonitor.java         | 12 ++---
 .../monitor/component/ApplicationMonitor.java   | 12 ++---
 .../monitor/component/GroupMonitor.java         | 18 ++++----
 .../component/ParentComponentMonitor.java       |  9 ++--
 .../publisher/DASScalingDecisionPublisher.java  | 10 ++---
 .../stratos/autoscaler/util/AutoscalerUtil.java |  3 +-
 .../autoscaler/util/ServiceReferenceHolder.java | 11 ++---
 .../agent/test/JavaCartridgeAgentTest.java      |  6 +--
 .../context/CloudControllerContext.java         | 11 ++---
 .../CloudControllerServiceComponent.java        | 22 +++++-----
 .../application/ApplicationEventReceiver.java   |  8 ++--
 .../status/ClusterStatusTopicReceiver.java      | 12 ++---
 .../initializer/InitializerTopicReceiver.java   | 12 ++---
 .../status/InstanceStatusTopicReceiver.java     | 20 ++++-----
 .../impl/CloudControllerServiceImpl.java        | 13 +++---
 .../DASMemberInformationPublisher.java          | 11 ++---
 .../publisher/DASMemberStatusPublisher.java     | 11 ++---
 .../common/threading/StratosThreadFactory.java  |  4 +-
 .../common/threading/StratosThreadPool.java     | 26 ++++++-----
 .../extension/api/LoadBalancerExtension.java    | 24 +++++-----
 .../internal/LoadBalancerServiceComponent.java  | 46 ++++++++++----------
 .../LoadBalancerStatisticsExecutor.java         | 12 ++---
 .../StratosManagerServiceComponent.java         | 38 ++++++++++------
 .../StratosManagerInitializerTopicReceiver.java | 10 ++---
 .../message/receiver/StratosEventReceiver.java  |  4 +-
 .../application/ApplicationsEventReceiver.java  | 16 +++----
 .../signup/ApplicationSignUpEventReceiver.java  | 16 +++----
 .../status/ClusterStatusEventReceiver.java      | 14 +++---
 .../mapping/DomainMappingEventReceiver.java     | 15 +++----
 .../health/stat/HealthStatEventReceiver.java    | 14 +++---
 .../initializer/InitializerEventReceiver.java   | 16 +++----
 .../notifier/InstanceNotifierEventReceiver.java |  6 +--
 .../status/InstanceStatusEventReceiver.java     | 12 ++---
 .../receiver/tenant/TenantEventReceiver.java    | 14 +++---
 .../topology/TopologyEventReceiver.java         | 16 +++----
 .../MetadataApplicationEventReceiver.java       |  6 +--
 .../service/MetadataTopologyEventReceiver.java  | 10 ++---
 .../mock/iaas/services/impl/MockInstance.java   | 13 +++---
 .../org/apache/stratos/aws/extension/Main.java  | 10 ++---
 .../apache/stratos/haproxy/extension/Main.java  |  8 ++--
 .../org/apache/stratos/lvs/extension/Main.java  |  8 ++--
 .../apache/stratos/nginx/extension/Main.java    |  8 ++--
 .../tests/PythonAgentIntegrationTest.java       |  3 +-
 .../integration/common/TopologyHandler.java     | 19 ++++----
 48 files changed, 335 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 0b13500..42d7771 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -42,7 +42,7 @@ public class AutoscalerHealthStatEventReceiver {
     private boolean terminated = false;
 
     private HealthStatEventReceiver healthStatEventReceiver;
-    private ExecutorService executorService;
+   // private ExecutorService executorService;
 
     public AutoscalerHealthStatEventReceiver() {
         this.healthStatEventReceiver = HealthStatEventReceiver.getInstance();
@@ -50,7 +50,7 @@ public class AutoscalerHealthStatEventReceiver {
     }
 
 //    public void execute() {
-//        healthStatEventReceiver.setExecutorService(executorService);
+//        healthStatEventReceiver.setExecutor(executor);
 //        healthStatEventReceiver.execute();
 //
 //        if (log.isInfoEnabled()) {
@@ -480,11 +480,11 @@ public class AutoscalerHealthStatEventReceiver {
         this.terminated = true;
     }
 
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public ExecutorService getExecutor() {
+//        return executorService;
+//    }
+//
+//    public void setExecutor(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
index b330211..e1dbd7f 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/initializer/AutoscalerInitializerTopicReceiver.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ExecutorService;
 public class AutoscalerInitializerTopicReceiver {
     private static final Log log = 
LogFactory.getLog(AutoscalerInitializerTopicReceiver.class);
     private InitializerEventReceiver initializerEventReceiver;
-    private ExecutorService executorService;
+    //private ExecutorService executorService;
 
     public AutoscalerInitializerTopicReceiver() {
         this.initializerEventReceiver = InitializerEventReceiver.getInstance();
@@ -39,7 +39,7 @@ public class AutoscalerInitializerTopicReceiver {
     }
 
 //    public void execute() {
-//        initializerEventReceiver.setExecutorService(executorService);
+//        initializerEventReceiver.setExecutor(executor);
 //        initializerEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Cloud controller initializer topic receiver started");
@@ -62,11 +62,11 @@ public class AutoscalerInitializerTopicReceiver {
         });
     }
 
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public ExecutorService getExecutor() {
+//        return executorService;
+//    }
+//
+//    public void setExecutor(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index daa70ae..b0af42a 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -57,7 +57,7 @@ public class AutoscalerTopologyEventReceiver {
     private TopologyEventReceiver topologyEventReceiver;
     private boolean terminated;
     private boolean topologyInitialized;
-    private ExecutorService executorService;
+    //private ExecutorService executorService;
 
     public AutoscalerTopologyEventReceiver() {
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
@@ -66,7 +66,7 @@ public class AutoscalerTopologyEventReceiver {
 
 //    public void execute() {
 //        //FIXME this activated before autoscaler deployer activated.
-//       // topologyEventReceiver.setExecutorService(getExecutorService());
+//       // topologyEventReceiver.setExecutor(getExecutor());
 //        //topologyEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Autoscaler topology receiver thread started");
@@ -515,11 +515,11 @@ public class AutoscalerTopologyEventReceiver {
 //        terminated = true;
 //    }
 //
-//    public ExecutorService getExecutorService() {
-//        return executorService;
+//    public ExecutorService getExecutor() {
+//        return executor;
 //    }
 //
-//    public void setExecutorService(ExecutorService executorService) {
-//        this.executorService = executorService;
+//    public void setExecutor(ExecutorService executor) {
+//        this.executor = executor;
 //    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index bb28577..8219f71 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -59,6 +59,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -85,7 +86,7 @@ public class AutoscalerServiceComponent {
     private AutoscalerTopologyEventReceiver asTopologyReceiver;
     private AutoscalerHealthStatEventReceiver 
autoscalerHealthStatEventReceiver;
     private AutoscalerInitializerTopicReceiver 
autoscalerInitializerTopicReceiver;
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
     private ScheduledExecutorService scheduler;
 
     protected void activate(ComponentContext componentContext) throws 
Exception {
@@ -96,8 +97,8 @@ public class AutoscalerServiceComponent {
             XMLConfiguration conf = 
ConfUtil.getInstance(AutoscalerConstants.COMPONENTS_CONFIG).getConfiguration();
             int threadPoolSize = conf
                     .getInt(AutoscalerConstants.THREAD_POOL_SIZE_KEY, 
AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE);
-            executorService = StratosThreadPool
-                    
.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID, 
threadPoolSize);
+            executor = 
StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID,
 ((int) Math
+                    .ceil(threadPoolSize / 3)), threadPoolSize);
 
             int schedulerThreadPoolSize = 
conf.getInt(AutoscalerConstants.SCHEDULER_THREAD_POOL_SIZE_KEY,
                     AutoscalerConstants.AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE);
@@ -114,7 +115,7 @@ public class AutoscalerServiceComponent {
                         componentStartUpSynchronizer
                                 
.waitForComponentActivation(Component.Autoscaler, Component.CloudController);
 
-                        
ServiceReferenceHolder.getInstance().setExecutorService(executorService);
+                        
ServiceReferenceHolder.getInstance().setExecutor(executor);
                         CartridgeConfigFileReader.readProperties();
                         if (AutoscalerContext.getInstance().isClustered()) {
                             Thread coordinatorElectorThread = new Thread() {
@@ -136,7 +137,7 @@ public class AutoscalerServiceComponent {
                                 }
                             };
                             coordinatorElectorThread.setName("Autoscaler 
coordinator elector thread");
-                            executorService.submit(coordinatorElectorThread);
+                            executor.submit(coordinatorElectorThread);
                         } else {
                             executeCoordinatorTasks();
                         }
@@ -173,7 +174,7 @@ public class AutoscalerServiceComponent {
 
         // Start topology receiver
         asTopologyReceiver = new AutoscalerTopologyEventReceiver();
-//        asTopologyReceiver.setExecutorService(executorService);
+//        asTopologyReceiver.setExecutor(executor);
         //asTopologyReceiver.execute();
         if (log.isDebugEnabled()) {
             log.debug("Topology receiver executor service started");
@@ -181,7 +182,7 @@ public class AutoscalerServiceComponent {
 
         // Start health stat receiver
         autoscalerHealthStatEventReceiver = new 
AutoscalerHealthStatEventReceiver();
-//        
autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+//        autoscalerHealthStatEventReceiver.setExecutor(executor);
 //        autoscalerHealthStatEventReceiver.execute();
         if (log.isDebugEnabled()) {
             log.debug("Health statistics receiver thread started");
@@ -189,7 +190,7 @@ public class AutoscalerServiceComponent {
 
         // Start initializer receiver
         autoscalerInitializerTopicReceiver = new 
AutoscalerInitializerTopicReceiver();
-//        
autoscalerInitializerTopicReceiver.setExecutorService(executorService);
+//        autoscalerInitializerTopicReceiver.setExecutor(executor);
 //        autoscalerInitializerTopicReceiver.execute();
 //        if (log.isDebugEnabled()) {
 //            log.debug("Initializer receiver thread started");
@@ -275,9 +276,9 @@ public class AutoscalerServiceComponent {
     }
 
     private void shutdownExecutorService(String executorServiceId) {
-        ExecutorService executorService = 
StratosThreadPool.getExecutorService(executorServiceId, 1);
-        if (executorService != null) {
-            shutdownExecutorService(executorService);
+        ThreadPoolExecutor executor = 
StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
+        if (executor != null) {
+            shutdownExecutorService(executor);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index 5976279..32bf037 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -87,7 +87,7 @@ public class ClusterMonitor extends Monitor {
 
     private static final Log log = LogFactory.getLog(ClusterMonitor.class);
     private final ScheduledExecutorService scheduler;
-    private final ExecutorService executorService;
+    private final ThreadPoolExecutor executor;
     protected boolean hasFaultyMember = false;
     protected ClusterContext clusterContext;
     protected String serviceType;
@@ -109,8 +109,8 @@ public class ClusterMonitor extends Monitor {
 
         scheduler = 
StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID,
 50);
         int threadPoolSize = 
Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100);
-        executorService = StratosThreadPool.getExecutorService(
-                AutoscalerConstants.MONITOR_THREAD_POOL_ID, threadPoolSize);
+        executor = StratosThreadPool.getExecutorService(
+                AutoscalerConstants.MONITOR_THREAD_POOL_ID, 
((int)Math.ceil(threadPoolSize/3)), threadPoolSize);
         this.clusterId = cluster.getClusterId();
         readConfigurations();
         this.groupScalingEnabledSubtree = groupScalingEnabledSubtree;
@@ -407,7 +407,7 @@ public class ClusterMonitor extends Monitor {
 
                             }
                         };
-                        executorService.execute(monitoringRunnable);
+                        executor.execute(monitoringRunnable);
                     }
 
                     if (instance.getStatus() == ClusterStatus.Terminating) {
@@ -453,7 +453,7 @@ public class ClusterMonitor extends Monitor {
                                 }
                             }
                         };
-                        executorService.execute(monitoringRunnable);
+                        executor.execute(monitoringRunnable);
                     }
                 }
             }
@@ -523,7 +523,7 @@ public class ClusterMonitor extends Monitor {
             }
 
         };
-        executorService.execute(monitoringRunnable);
+        executor.execute(monitoringRunnable);
 
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java
index e15b795..c94ef7e 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java
@@ -55,7 +55,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * ApplicationMonitor is to control the child monitors
@@ -64,7 +64,7 @@ public class ApplicationMonitor extends 
ParentComponentMonitor {
 
     private static final Log log = LogFactory.getLog(ApplicationMonitor.class);
 
-    private final ExecutorService executorService;
+    private final ThreadPoolExecutor executor;
 
     //Flag to set whether application is terminating
     private boolean isTerminating;
@@ -79,8 +79,8 @@ public class ApplicationMonitor extends 
ParentComponentMonitor {
         super(application);
 
         int threadPoolSize = 
Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100);
-        this.executorService = StratosThreadPool.getExecutorService(
-                AutoscalerConstants.MONITOR_THREAD_POOL_ID, threadPoolSize);
+        this.executor = 
StratosThreadPool.getExecutorService(AutoscalerConstants.MONITOR_THREAD_POOL_ID,
+                ((int)Math.ceil(threadPoolSize/3)), threadPoolSize);
 
         //setting the appId for the application
         this.appId = application.getUniqueIdentifier();
@@ -164,7 +164,7 @@ public class ApplicationMonitor extends 
ParentComponentMonitor {
                 }
             }
         };
-        executorService.execute(monitoringRunnable);
+        executor.execute(monitoringRunnable);
     }
 
     private void handleScalingMaxOut(ParentInstanceContext instanceContext,
@@ -476,7 +476,7 @@ public class ApplicationMonitor extends 
ParentComponentMonitor {
                 }
             }
         };
-        executorService.execute(monitoringRunnable);
+        executor.execute(monitoringRunnable);
 
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
index 341f264..d5cfc3f 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
@@ -56,7 +56,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * This is GroupMonitor to monitor the group which consists of
@@ -66,7 +66,7 @@ public class GroupMonitor extends ParentComponentMonitor {
 
     private static final Log log = LogFactory.getLog(GroupMonitor.class);
 
-    private final ExecutorService executorService;
+    private final ThreadPoolExecutor executor;
     //has scaling dependents
     protected boolean hasScalingDependents;
     //Indicates whether groupScaling enabled or not
@@ -85,8 +85,8 @@ public class GroupMonitor extends ParentComponentMonitor {
         super(group);
 
         int threadPoolSize = 
Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100);
-        this.executorService = StratosThreadPool.getExecutorService(
-                AutoscalerConstants.MONITOR_THREAD_POOL_ID, threadPoolSize);
+        this.executor = StratosThreadPool.getExecutorService(
+                AutoscalerConstants.MONITOR_THREAD_POOL_ID, 
((int)Math.ceil(threadPoolSize/3)),threadPoolSize);
 
         this.groupScalingEnabled = group.isGroupScalingEnabled();
         this.appId = appId;
@@ -225,7 +225,7 @@ public class GroupMonitor extends ParentComponentMonitor {
                 }
             }
         };
-        executorService.execute(monitoringRunnable);
+        executor.execute(monitoringRunnable);
     }
 
     /**
@@ -336,7 +336,7 @@ public class GroupMonitor extends ParentComponentMonitor {
                                     appId);
                         }
                     };
-                    executorService.execute(sendScaleMaxOut);
+                    executor.execute(sendScaleMaxOut);
                 }
             } else {
                 if (log.isDebugEnabled()) {
@@ -356,7 +356,7 @@ public class GroupMonitor extends ParentComponentMonitor {
                             appId);
                 }
             };
-            executorService.execute(sendScaleMaxOut);
+            executor.execute(sendScaleMaxOut);
         }
     }
 
@@ -488,7 +488,7 @@ public class GroupMonitor extends ParentComponentMonitor {
                 }
             }
         };
-        executorService.execute(monitoringRunnable);
+        executor.execute(monitoringRunnable);
 
     }
 
@@ -594,7 +594,7 @@ public class GroupMonitor extends ParentComponentMonitor {
                 }
             }
         };
-        executorService.execute(monitoringRunnable);
+        executor.execute(monitoringRunnable);
 
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
index f05827a..1366a3f 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
@@ -87,7 +87,7 @@ public abstract class ParentComponentMonitor extends Monitor {
     // future to cancel it when destroying monitors
     private ScheduledFuture<?> schedulerFuture;
     //Executor service to maintain the thread pool
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
 
     public ParentComponentMonitor(ParentComponent component) throws 
DependencyBuilderException {
         aliasToActiveChildMonitorsMap = new ConcurrentHashMap<String, 
Monitor>();
@@ -109,8 +109,9 @@ public abstract class ParentComponentMonitor extends 
Monitor {
         }
 
         // Create the executor service with identifier and thread pool size
-        executorService = 
StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID,
-                AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE);
+        executor = 
StratosThreadPool.getExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID,
+                
((int)Math.ceil(AutoscalerConstants.AUTOSCALER_THREAD_POOL_SIZE/3)),AutoscalerConstants
+                        .AUTOSCALER_THREAD_POOL_SIZE);
         networkPartitionContextsMap = new ConcurrentHashMap<String, 
NetworkPartitionContext>();
     }
 
@@ -864,7 +865,7 @@ public abstract class ParentComponentMonitor extends 
Monitor {
                                              ApplicationChildContext context, 
List<String> parentInstanceIds) {
         if (!this.aliasToActiveChildMonitorsMap.containsKey(context.getId())) {
             pendingChildMonitorsList.add(context.getId());
-            executorService.submit(new MonitorAdder(parent, context, 
this.appId, parentInstanceIds));
+            executor.submit(new MonitorAdder(parent, context, this.appId, 
parentInstanceIds));
 
             String monitorTypeStr = 
AutoscalerUtil.findMonitorType(context).toString().toLowerCase();
             log.info(String.format("Monitor scheduled: [type] %s [component] 
%s ",

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
index 52857d4..ad0e77b 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -29,7 +29,7 @@ import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
@@ -42,12 +42,12 @@ public class DASScalingDecisionPublisher extends 
ScalingDecisionPublisher {
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
     private static final int STATS_PUBLISHER_THREAD_POOL_SIZE = 10;
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
 
     public DASScalingDecisionPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
-                STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executor = 
StratosThreadPool.getExecutorService(AutoscalerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                ((int)Math.ceil(STATS_PUBLISHER_THREAD_POOL_SIZE/3)), 
STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASScalingDecisionPublisher getInstance() {
@@ -168,7 +168,7 @@ public class DASScalingDecisionPublisher extends 
ScalingDecisionPublisher {
             }
 
         };
-        executorService.execute(publisher);
+        executor.execute(publisher);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index a639673..b6ce0ed 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -51,7 +51,6 @@ import 
org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
 import org.apache.stratos.autoscaler.pojo.policy.deployment.ApplicationPolicy;
 import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
 import org.apache.stratos.autoscaler.registry.RegistryManager;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.common.Properties;
 import org.apache.stratos.common.Property;
 import org.apache.stratos.common.client.CloudControllerServiceClient;
@@ -843,7 +842,7 @@ public class AutoscalerUtil {
         AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
         if (autoscalerContext.getAppMonitor(applicationId) == null) {
             autoscalerContext.addApplicationPendingMonitor(applicationId);
-            
ServiceReferenceHolder.getInstance().getExecutorService().submit(new 
ApplicationMonitorAdder(applicationId));
+            ServiceReferenceHolder.getInstance().getExecutor().submit(new 
ApplicationMonitorAdder(applicationId));
 
             log.info(String.format("Monitor scheduled: [application] %s ", 
applicationId));
         } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
index 4cc175c..7e10a3c 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java
@@ -32,6 +32,7 @@ import org.wso2.carbon.registry.core.Registry;
 import org.wso2.carbon.registry.core.session.UserRegistry;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 public class ServiceReferenceHolder {
 
@@ -43,7 +44,7 @@ public class ServiceReferenceHolder {
     private AxisConfiguration axisConfiguration;
     private DistributedObjectProvider distributedObjectProvider;
     private HazelcastInstance hazelcastInstance;
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
     private ComponentStartUpSynchronizer componentStartUpSynchronizer;
 
     private ServiceReferenceHolder() {
@@ -116,12 +117,12 @@ public class ServiceReferenceHolder {
         this.groupStatusProcessorChain = groupStatusProcessorChain;
     }
 
-    public ExecutorService getExecutorService() {
-        return executorService;
+    public ThreadPoolExecutor getExecutor() {
+        return executor;
     }
 
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
+    public void setExecutor(ThreadPoolExecutor executor) {
+        this.executor = executor;
     }
 
     public ComponentStartUpSynchronizer getComponentStartUpSynchronizer() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
 
b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
index 903fa58..751e7c8 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
@@ -121,13 +121,13 @@ public class JavaCartridgeAgentTest {
 
         String agentHome = setupJavaAgent();
 
-        ExecutorService executorService = 
StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5);
+//        ExecutorService executorService = 
StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5);
         topologyEventReceiver = TopologyEventReceiver.getInstance();
-        //topologyEventReceiver.setExecutorService(executorService);
+        //topologyEventReceiver.setExecutorService(executor);
         //topologyEventReceiver.execute();
 
         instanceStatusEventReceiver = 
InstanceStatusEventReceiver.getInstance();
-//        instanceStatusEventReceiver.setExecutorService(executorService);
+//        instanceStatusEventReceiver.setExecutorService(executor);
 //        instanceStatusEventReceiver.execute();
 
         instanceStarted = false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index 4d28dd5..0771d5a 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -44,6 +44,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -135,8 +136,8 @@ public class CloudControllerContext implements Serializable 
{
     /**
      * Thread pool used in this task to execute parallel tasks.
      */
-    private transient ExecutorService executorService = StratosThreadPool
-            .getExecutorService("cloud.controller.context.thread.pool", 10);
+    private transient ThreadPoolExecutor executor = StratosThreadPool
+            .getExecutorService("cloud.controller.context.thread.pool", 5, 10);
 
     /**
      * Map of registered {@link 
org.apache.stratos.cloud.controller.domain.Cartridge}s
@@ -495,9 +496,9 @@ public class CloudControllerContext implements Serializable 
{
         return removed;
     }
 
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
+//    public ExecutorService getExecutor() {
+//        return executor;
+//    }
 
     public List<String> getPartitionIds(String cartridgeType) {
         return cartridgeTypeToPartitionIdsMap.get(cartridgeType);

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 267d5a8..710e400 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -49,6 +49,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -82,7 +83,7 @@ public class CloudControllerServiceComponent {
     private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
     private ApplicationEventReceiver applicationEventReceiver;
     private InitializerTopicReceiver initializerTopicReceiver;
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
     private ScheduledExecutorService scheduler;
 
     protected void activate(final ComponentContext context) {
@@ -90,7 +91,8 @@ public class CloudControllerServiceComponent {
             log.debug("Activating CloudControllerServiceComponent...");
         }
         try {
-            executorService = 
StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
+            executor = StratosThreadPool.getExecutorService(THREAD_POOL_ID, 
((int)Math.ceil
+                    (THREAD_POOL_SIZE/3)),THREAD_POOL_SIZE);
             scheduler = StratosThreadPool
                     .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, 
SCHEDULER_THREAD_POOL_SIZE);
 
@@ -123,7 +125,7 @@ public class CloudControllerServiceComponent {
                                 }
                             };
                             coordinatorElectorThread.setName("Cloud controller 
coordinator elector thread");
-                            executorService.submit(coordinatorElectorThread);
+                            executor.submit(coordinatorElectorThread);
                         } else {
                             executeCoordinatorTasks();
                         }
@@ -146,7 +148,7 @@ public class CloudControllerServiceComponent {
 
     private void executeCoordinatorTasks() {
         applicationEventReceiver = new ApplicationEventReceiver();
-//        applicationEventReceiver.setExecutorService(executorService);
+//        applicationEventReceiver.setExecutorService(executor);
 //        applicationEventReceiver.execute();
 
         if (log.isInfoEnabled()) {
@@ -154,7 +156,7 @@ public class CloudControllerServiceComponent {
         }
 
         clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
-//        clusterStatusTopicReceiver.setExecutorService(executorService);
+//        clusterStatusTopicReceiver.setExecutorService(executor);
 //        clusterStatusTopicReceiver.execute();
 
         if (log.isInfoEnabled()) {
@@ -162,7 +164,7 @@ public class CloudControllerServiceComponent {
         }
 
         instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
-//        instanceStatusTopicReceiver.setExecutorService(executorService);
+//        instanceStatusTopicReceiver.setExecutorService(executor);
 //        instanceStatusTopicReceiver.execute();
 
         if (log.isInfoEnabled()) {
@@ -170,7 +172,7 @@ public class CloudControllerServiceComponent {
         }
 
         initializerTopicReceiver = new InitializerTopicReceiver();
-//        initializerTopicReceiver.setExecutorService(executorService);
+//        initializerTopicReceiver.setExecutorService(executor);
 //        initializerTopicReceiver.execute();
 
         if (log.isInfoEnabled()) {
@@ -269,9 +271,9 @@ public class CloudControllerServiceComponent {
     }
 
     private void shutdownExecutorService(String executorServiceId) {
-        ExecutorService executorService = 
StratosThreadPool.getExecutorService(executorServiceId, 1);
-        if (executorService != null) {
-            shutdownExecutorService(executorService);
+        ThreadPoolExecutor executor = 
StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
+        if (executor != null) {
+            shutdownExecutorService(executor);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
index 8da5575..26290b7 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java
@@ -37,7 +37,7 @@ import java.util.concurrent.ExecutorService;
 public class ApplicationEventReceiver {
     private static final Log log = 
LogFactory.getLog(ApplicationEventReceiver.class);
     private ApplicationsEventReceiver applicationsEventReceiver;
-   // private ExecutorService executorService;
+   // private ExecutorService executor;
 
     public ApplicationEventReceiver() {
         this.applicationsEventReceiver = 
ApplicationsEventReceiver.getInstance();
@@ -48,7 +48,7 @@ public class ApplicationEventReceiver {
 //        if (log.isInfoEnabled()) {
 //            log.info("Cloud controller application event receiver thread 
started");
 //        }
-//        applicationsEventReceiver.setExecutorService(executorService);
+//        applicationsEventReceiver.setExecutorService(executor);
 //        applicationsEventReceiver.execute();
 //    }
 
@@ -76,7 +76,7 @@ public class ApplicationEventReceiver {
         });
     }
 
-//    public void setExecutorService(ExecutorService executorService) {
-//        this.executorService = executorService;
+//    public void setExecutorService(ExecutorService executor) {
+//        this.executor = executor;
 //    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index e0b9f62..a7c0947 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -34,7 +34,7 @@ public class ClusterStatusTopicReceiver {
 
     private ClusterStatusEventReceiver clusterStatusEventReceiver;
     //private boolean terminated;
-    //private ExecutorService executorService;
+    //private ExecutorService executor;
 
     public ClusterStatusTopicReceiver() {
         this.clusterStatusEventReceiver = 
ClusterStatusEventReceiver.getInstance();
@@ -42,7 +42,7 @@ public class ClusterStatusTopicReceiver {
     }
 
 //    public void execute() {
-//        clusterStatusEventReceiver.setExecutorService(executorService);
+//        clusterStatusEventReceiver.setExecutorService(executor);
 //        clusterStatusEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Cloud controller Cluster status thread started");
@@ -119,11 +119,11 @@ public class ClusterStatusTopicReceiver {
 //        this.terminated = terminated;
 //    }
 //
-//    public ExecutorService getExecutorService() {
-//        return executorService;
+//    public ExecutorService getExecutor() {
+//        return executor;
 //    }
 //
-//    public void setExecutorService(ExecutorService executorService) {
-//        this.executorService = executorService;
+//    public void setExecutorService(ExecutorService executor) {
+//        this.executor = executor;
 //    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
index 9a2c502..4d31058 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/initializer/InitializerTopicReceiver.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ExecutorService;
 public class InitializerTopicReceiver {
     private static final Log log = 
LogFactory.getLog(InitializerTopicReceiver.class);
     private InitializerEventReceiver initializerEventReceiver;
-    private ExecutorService executorService;
+    //private ExecutorService executorService;
 
     public InitializerTopicReceiver() {
         this.initializerEventReceiver = InitializerEventReceiver.getInstance();
@@ -39,7 +39,7 @@ public class InitializerTopicReceiver {
     }
 
 //    public void execute() {
-//        initializerEventReceiver.setExecutorService(executorService);
+//        initializerEventReceiver.setExecutorService(executor);
 //        initializerEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Autoscaler initializer topic receiver started");
@@ -62,11 +62,11 @@ public class InitializerTopicReceiver {
         });
     }
 
-//    public ExecutorService getExecutorService() {
-//        return executorService;
+//    public ExecutorService getExecutor() {
+//        return executor;
 //    }
 //
-//    public void setExecutorService(ExecutorService executorService) {
-//        this.executorService = executorService;
+//    public void setExecutorService(ExecutorService executor) {
+//        this.executor = executor;
 //    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index bfa205b..a3bbc2b 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -42,8 +42,8 @@ public class InstanceStatusTopicReceiver {
     private static final Log log = 
LogFactory.getLog(InstanceStatusTopicReceiver.class);
 
     private InstanceStatusEventReceiver statusEventReceiver;
-    private boolean terminated;
-    private ExecutorService executorService;
+    //private boolean terminated;
+    //private ExecutorService executorService;
 
     public InstanceStatusTopicReceiver() {
         this.statusEventReceiver = InstanceStatusEventReceiver.getInstance();
@@ -51,7 +51,7 @@ public class InstanceStatusTopicReceiver {
     }
 
 //    public void execute() {
-//        statusEventReceiver.setExecutorService(executorService);
+//        statusEventReceiver.setExecutorService(executor);
 //        statusEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Cloud controller application status thread started");
@@ -132,11 +132,11 @@ public class InstanceStatusTopicReceiver {
 
     }
 
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
+//
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index d25ab52..8bd9ec2 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -43,7 +43,7 @@ import 
org.wso2.carbon.registry.core.exceptions.RegistryException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -60,10 +60,11 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     public static final String KUBERNETES_CLUSTER = "cluster";
 
     private CloudControllerContext cloudControllerContext = 
CloudControllerContext.getInstance();
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
 
     public CloudControllerServiceImpl() {
-        executorService = 
StratosThreadPool.getExecutorService("cloud.controller.instance.manager.thread.pool",
 50);
+        executor = 
StratosThreadPool.getExecutorService("cloud.controller.instance.manager.thread" 
+
+                ".pool", 20, 50);
 
     }
 
@@ -496,7 +497,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                                 + "[member] %s [application-id] %s", 
instanceContext.getClusterId(),
                         instanceContext.getClusterInstanceId(), memberId, 
applicationId));
             }
-            executorService.execute(new InstanceCreator(memberContext, 
iaasProvider, payload.toString().getBytes()));
+            executor.execute(new InstanceCreator(memberContext, iaasProvider, 
payload.toString().getBytes()));
 
             return memberContext;
         } catch (Exception e) {
@@ -675,7 +676,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                         }
                     }
                 }
-                executorService.execute(new InstanceTerminator(memberContext));
+                executor.execute(new InstanceTerminator(memberContext));
             } finally {
                 TopologyHolder.releaseWriteLock();
             }
@@ -726,7 +727,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
         }
 
         for (MemberContext memberContext : memberContexts) {
-            executorService.execute(new InstanceTerminator(memberContext));
+            executor.execute(new InstanceTerminator(memberContext));
         }
         return true;
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
index 8107e1b..0d38967 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -35,7 +35,7 @@ import org.wso2.carbon.databridge.commons.StreamDefinition;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * MemberInfoPublisher to publish member information/metadata to DAS.
@@ -48,12 +48,13 @@ public class DASMemberInformationPublisher extends 
MemberInformationPublisher {
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
     private static final String VALUE_NOT_FOUND = "Value Not Found";
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
 
     private DASMemberInformationPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
-                CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executor = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                
((int)Math.ceil(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE/3)), 
CloudControllerConstants
+                        .STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberInformationPublisher getInstance() {
@@ -161,7 +162,7 @@ public class DASMemberInformationPublisher extends 
MemberInformationPublisher {
                 }
             }
         };
-        executorService.execute(publisher);
+        executor.execute(publisher);
     }
 
     public static String handleNull(String param) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
index 6bb6251..f28d2c8 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -29,7 +29,7 @@ import org.wso2.carbon.databridge.commons.StreamDefinition;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * Publishing member status to DAS.
@@ -41,12 +41,13 @@ public class DASMemberStatusPublisher extends 
MemberStatusPublisher {
     private static final String DATA_STREAM_NAME = "member_lifecycle";
     private static final String VERSION = "1.0.0";
     private static final String DAS_THRIFT_CLIENT_NAME = "das";
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
 
     private DASMemberStatusPublisher() {
         super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
-        executorService = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
-                CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE);
+        executor = 
StratosThreadPool.getExecutorService(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_ID,
+                
((int)Math.ceil(CloudControllerConstants.STATS_PUBLISHER_THREAD_POOL_SIZE/3)), 
CloudControllerConstants
+                        .STATS_PUBLISHER_THREAD_POOL_SIZE);
     }
 
     public static DASMemberStatusPublisher getInstance() {
@@ -133,7 +134,7 @@ public class DASMemberStatusPublisher extends 
MemberStatusPublisher {
                 publish(payload.toArray());
             }
         };
-        executorService.execute(publisher);
+        executor.execute(publisher);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java
index b6abbf3..98ddd37 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadFactory.java
@@ -19,7 +19,9 @@
 
 package org.apache.stratos.common.threading;
 
-public class StratosThreadFactory {
+import java.util.concurrent.ThreadFactory;
+
+public class StratosThreadFactory implements ThreadFactory {
     private String prefix;
     private int counter;
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index d4531e2..459cd1d 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -33,7 +33,7 @@ public class StratosThreadPool {
 
     private static final Log log = LogFactory.getLog(StratosThreadPool.class);
 
-    private static Map<String, ExecutorService> executorServiceMap = new 
ConcurrentHashMap<String, ExecutorService>();
+    private static Map<String, ThreadPoolExecutor> executorMap = new 
ConcurrentHashMap<>();
     private static Map<String, ScheduledExecutorService> scheduledServiceMap = 
new ConcurrentHashMap<String, ScheduledExecutorService>();
     private static Object executorServiceMapLock = new Object();
     private static Object scheduledServiceMapLock = new Object();
@@ -42,21 +42,24 @@ public class StratosThreadPool {
      * Return the executor service based on the identifier and thread pool size
      *
      * @param identifier     Thread pool identifier name
-     * @param threadPoolSize Thread pool size
+     * @param maxSize Thread pool size
      * @return ExecutorService
      */
-    public static ExecutorService getExecutorService(String identifier, int 
threadPoolSize) {
-        ExecutorService executorService = executorServiceMap.get(identifier);
-        if (executorService == null) {
+    public static ThreadPoolExecutor getExecutorService(String identifier, int 
initialSize, int
+            maxSize) {
+        ThreadPoolExecutor executor = executorMap.get(identifier);
+        if (executor == null) {
             synchronized (executorServiceMapLock) {
-                if (executorService == null) {
-                    executorService = 
Executors.newFixedThreadPool(threadPoolSize);
-                    executorServiceMap.put(identifier, executorService);
-                    log.info(String.format("Thread pool created: [type] 
Executor Service [id] %s [size] %d", identifier, threadPoolSize));
+                if (executor == null) {
+                    executor = new ThreadPoolExecutor(initialSize, maxSize, 
60L, TimeUnit.SECONDS,
+                            new LinkedBlockingQueue<Runnable>(), new 
StratosThreadFactory(identifier));
+                    executorMap.put(identifier, executor);
+                    log.info(String.format("Thread pool created: [type] 
Executor [id] %s " +
+                            "[initial size] %d [max size] %d", identifier, 
initialSize, maxSize));
                 }
             }
         }
-        return executorService;
+        return executor;
     }
 
     /**
@@ -71,7 +74,8 @@ public class StratosThreadPool {
         if (scheduledExecutorService == null) {
             synchronized (scheduledServiceMapLock) {
                 if (scheduledExecutorService == null) {
-                    scheduledExecutorService = 
Executors.newScheduledThreadPool(threadPoolSize);
+                    scheduledExecutorService = 
Executors.newScheduledThreadPool(threadPoolSize,
+                            new StratosThreadFactory(identifier));
                     scheduledServiceMap.put(identifier, 
scheduledExecutorService);
                     log.info(String.format("Thread pool created: [type] 
Scheduled Executor Service [id] %s [size] %d",
                             identifier, threadPoolSize));

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index ec1ddbc..838f0fc 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -89,10 +89,10 @@ public class LoadBalancerExtension {
             }
 
             // Start topology receiver thread
-            startTopologyEventReceiver(executorService, topologyProvider);
-            startApplicationEventReceiver(executorService);
-            startApplicationSignUpEventReceiver(executorService, 
topologyProvider);
-            startDomainMappingEventReceiver(executorService, topologyProvider);
+            startTopologyEventReceiver(topologyProvider);
+            startApplicationEventReceiver();
+            startApplicationSignUpEventReceiver(topologyProvider);
+            startDomainMappingEventReceiver(topologyProvider);
 
             if (statsReader != null) {
                 // Start stats notifier thread
@@ -115,17 +115,16 @@ public class LoadBalancerExtension {
     /**
      * Start topology event receiver thread.
      *
-     * @param executorService  executor service instance
      * @param topologyProvider topology provider instance
      */
-    private void startTopologyEventReceiver(ExecutorService executorService, 
TopologyProvider topologyProvider) {
+    private void startTopologyEventReceiver(TopologyProvider topologyProvider) 
{
         // Enforcing the listener order in order execute extension listener 
later
         topologyEventReceiver = new 
LoadBalancerCommonTopologyEventReceiver(topologyProvider, false);
         // Add load-balancer extension event listener
         addTopologyEventListeners(topologyEventReceiver);
         // Add default topology provider event listeners
         topologyEventReceiver.addEventListeners();
-//        topologyEventReceiver.setExecutorService(executorService);
+//        topologyEventReceiver.setExecutorService(executor);
 //        topologyEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Topology receiver thread started");
@@ -149,9 +148,9 @@ public class LoadBalancerExtension {
         }
     }
 
-    private void startApplicationEventReceiver(ExecutorService 
executorService) {
+    private void startApplicationEventReceiver() {
         applicationsEventReceiver = ApplicationsEventReceiver.getInstance();
-//        applicationsEventReceiver.setExecutorService(executorService);
+//        applicationsEventReceiver.setExecutorService(executor);
 //        applicationsEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Application event receiver thread started");
@@ -164,14 +163,14 @@ public class LoadBalancerExtension {
      * @param executorService  executor service instance
      * @param topologyProvider topology receiver instance
      */
-    private void startDomainMappingEventReceiver(ExecutorService 
executorService, TopologyProvider topologyProvider) {
+    private void startDomainMappingEventReceiver(TopologyProvider 
topologyProvider) {
         // Enforcing the listener order in order execute extension listener 
later
         domainMappingEventReceiver = new 
LoadBalancerCommonDomainMappingEventReceiver(topologyProvider, false);
         // Add extension event listeners
         addDomainMappingsEventListeners(domainMappingEventReceiver);
         // Add default domain mapping event listeners
         domainMappingEventReceiver.addEventListeners();
-//        domainMappingEventReceiver.setExecutorService(executorService);
+//        domainMappingEventReceiver.setExecutorService(executor);
 //        domainMappingEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Domain mapping event receiver thread started");
@@ -198,10 +197,9 @@ public class LoadBalancerExtension {
     /**
      * Start application signup event receiver thread.
      *
-     * @param executorService  executor service instance
      * @param topologyProvider topology provider instance
      */
-    private void startApplicationSignUpEventReceiver(ExecutorService 
executorService, TopologyProvider topologyProvider) {
+    private void startApplicationSignUpEventReceiver(TopologyProvider 
topologyProvider) {
         applicationSignUpEventReceiver = new 
LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
         if (log.isInfoEnabled()) {
             log.info("Application signup event receiver thread started");

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 3786af8..cb2297a 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -64,6 +64,7 @@ import 
org.wso2.carbon.utils.multitenancy.MultitenantConstants;
 import java.io.File;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * @scr.component 
name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" 
immediate="true"
@@ -90,7 +91,7 @@ public class LoadBalancerServiceComponent {
     private static final Log log = 
LogFactory.getLog(LoadBalancerServiceComponent.class);
 
     private boolean activated = false;
-    private ExecutorService executorService;
+    //private ThreadPoolExecutor executor;
     private LoadBalancerTopologyEventReceiver topologyEventReceiver;
     private TenantEventReceiver tenantEventReceiver;
     private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver;
@@ -126,8 +127,9 @@ public class LoadBalancerServiceComponent {
 
             int threadPoolSize = 
Integer.getInteger(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_SIZE_KEY,
                     
LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE);
-            executorService = 
StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID,
-                    threadPoolSize);
+           // executor = 
StratosThreadPool.getExecutorService(LoadBalancerConstants
+            //                .LOAD_BALANCER_THREAD_POOL_ID,
+            //        ((int)Math.ceil(threadPoolSize/3)), threadPoolSize);
 
             TopologyProvider topologyProvider = 
LoadBalancerConfiguration.getInstance().getTopologyProvider();
             if (topologyProvider == null) {
@@ -137,18 +139,18 @@ public class LoadBalancerServiceComponent {
 
             if (configuration.isMultiTenancyEnabled() || 
configuration.isDomainMappingEnabled()) {
                 // Start tenant & application signup event receivers
-                startTenantEventReceiver(executorService);
-                startApplicationSignUpEventReceiver(executorService, 
topologyProvider);
+                startTenantEventReceiver();
+                startApplicationSignUpEventReceiver(topologyProvider);
             }
 
             if (configuration.isDomainMappingEnabled()) {
                 // Start domain mapping event receiver
-                startDomainMappingEventReceiver(executorService, 
topologyProvider);
+                startDomainMappingEventReceiver(topologyProvider);
             }
 
             if (configuration.isTopologyEventListenerEnabled()) {
                 // Start topology receiver
-                startTopologyEventReceiver(executorService, topologyProvider);
+                startTopologyEventReceiver(topologyProvider);
             }
 
             if (configuration.isCepStatsPublisherEnabled()) {
@@ -167,39 +169,39 @@ public class LoadBalancerServiceComponent {
         }
     }
 
-    private void startDomainMappingEventReceiver(ExecutorService 
executorService, TopologyProvider topologyProvider) {
+    private void startDomainMappingEventReceiver( TopologyProvider 
topologyProvider) {
         if (domainMappingEventReceiver != null) {
             return;
         }
 
         domainMappingEventReceiver = new 
LoadBalancerDomainMappingEventReceiver(topologyProvider);
-//        domainMappingEventReceiver.setExecutorService(executorService);
+//        domainMappingEventReceiver.setExecutorService(executor);
 //        domainMappingEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Domain mapping event receiver thread started");
 //        }
     }
 
-    private void startApplicationSignUpEventReceiver(ExecutorService 
executorService, TopologyProvider topologyProvider) {
+    private void startApplicationSignUpEventReceiver(TopologyProvider 
topologyProvider) {
         if (applicationSignUpEventReceiver != null) {
             return;
         }
 
         applicationSignUpEventReceiver = new 
LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
-//        applicationSignUpEventReceiver.setExecutorService(executorService);
+//        applicationSignUpEventReceiver.setExecutorService(executor);
 //        applicationSignUpEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Application signup event receiver thread started");
         }
     }
 
-    private void startTopologyEventReceiver(ExecutorService executorService, 
TopologyProvider topologyProvider) {
+    private void startTopologyEventReceiver(TopologyProvider topologyProvider) 
{
         if (topologyEventReceiver != null) {
             return;
         }
 
         topologyEventReceiver = new 
LoadBalancerTopologyEventReceiver(topologyProvider);
-//        topologyEventReceiver.setExecutorService(executorService);
+//        topologyEventReceiver.setExecutorService(executor);
 //        topologyEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Topology receiver thread started");
@@ -223,10 +225,10 @@ public class LoadBalancerServiceComponent {
         }
     }
 
-    private void startTenantEventReceiver(ExecutorService executorService) {
+    private void startTenantEventReceiver() {
 
         tenantEventReceiver = TenantEventReceiver.getInstance();
-//        tenantEventReceiver.setExecutorService(executorService);
+//        tenantEventReceiver.setExecutorService(executor);
 //        tenantEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Tenant event receiver thread started");
@@ -293,13 +295,13 @@ public class LoadBalancerServiceComponent {
         }
 
         // Shutdown executor service
-        if (executorService != null) {
-            try {
-                executorService.shutdownNow();
-            } catch (Exception e) {
-                log.warn("An error occurred while shutting down load balancer 
executor service", e);
-            }
-        }
+//        if (executor != null) {
+//            try {
+//                executor.shutdownNow();
+//            } catch (Exception e) {
+//                log.warn("An error occurred while shutting down load 
balancer executor service", e);
+//            }
+//        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
index 04a0756..e625ec7 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
@@ -24,6 +24,7 @@ import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * An executor service to asynchronously execute statistics update calls 
without blocking the
@@ -35,11 +36,12 @@ public class LoadBalancerStatisticsExecutor {
 
     private static volatile LoadBalancerStatisticsExecutor instance;
 
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
 
     private LoadBalancerStatisticsExecutor() {
-        executorService = 
StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID,
-                LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE);
+        executor = 
StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID,
+                
((int)Math.ceil(LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE/3)),
 LoadBalancerConstants
+                        .LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE);
     }
 
     public static LoadBalancerStatisticsExecutor getInstance() {
@@ -53,7 +55,7 @@ public class LoadBalancerStatisticsExecutor {
         return instance;
     }
 
-    public ExecutorService getService() {
-        return executorService;
+    public ThreadPoolExecutor getService() {
+        return executor;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index c4d68ae..0486d84 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -50,6 +50,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -92,7 +93,7 @@ public class StratosManagerServiceComponent {
     private StratosManagerInstanceStatusEventReceiver 
instanceStatusEventReceiver;
     private StratosManagerApplicationEventReceiver applicationEventReceiver;
     private StratosManagerInitializerTopicReceiver initializerTopicReceiver;
-    private ExecutorService executorService;
+    private ThreadPoolExecutor executor;
     private ScheduledExecutorService scheduler;
 
     protected void activate(final ComponentContext componentContext) throws 
Exception {
@@ -100,7 +101,8 @@ public class StratosManagerServiceComponent {
             log.debug("Activating StratosManagerServiceComponent...");
         }
         try {
-            executorService = 
StratosThreadPool.getExecutorService(THREAD_POOL_ID, THREAD_POOL_SIZE);
+            executor = StratosThreadPool.getExecutorService(THREAD_POOL_ID, 
((int)Math.ceil(THREAD_POOL_SIZE/3))
+                    , THREAD_POOL_SIZE);
             scheduler = StratosThreadPool
                     .getScheduledExecutorService(SCHEDULER_THREAD_POOL_ID, 
SCHEDULER_THREAD_POOL_SIZE);
 
@@ -141,7 +143,7 @@ public class StratosManagerServiceComponent {
                                 }
                             };
                             coordinatorElectorThread.setName("Stratos manager 
coordinator elector thread");
-                            executorService.submit(coordinatorElectorThread);
+                            executor.submit(coordinatorElectorThread);
                         } else {
                             executeCoordinatorTasks(componentContext);
                         }
@@ -193,7 +195,7 @@ public class StratosManagerServiceComponent {
 
     private void initializeInitializerEventReceiver() {
         initializerTopicReceiver = new 
StratosManagerInitializerTopicReceiver();
-//        initializerTopicReceiver.setExecutorService(executorService);
+//        initializerTopicReceiver.setExecutorService(executor);
 //        initializerTopicReceiver.execute();
     }
 
@@ -202,7 +204,7 @@ public class StratosManagerServiceComponent {
      */
     private void initializeInstanceStatusEventReceiver() {
         instanceStatusEventReceiver = new 
StratosManagerInstanceStatusEventReceiver();
-//        instanceStatusEventReceiver.setExecutorService(executorService);
+//        instanceStatusEventReceiver.setExecutorService(executor);
 //        instanceStatusEventReceiver.execute();
     }
 
@@ -211,7 +213,7 @@ public class StratosManagerServiceComponent {
      */
     private void initializeTopologyEventReceiver() {
         topologyEventReceiver = new StratosManagerTopologyEventReceiver();
-//        topologyEventReceiver.setExecutorService(executorService);
+//        topologyEventReceiver.setExecutorService(executor);
 //        topologyEventReceiver.execute();
     }
 
@@ -220,7 +222,7 @@ public class StratosManagerServiceComponent {
      */
     private void initializeApplicationEventReceiver() {
         applicationEventReceiver = new 
StratosManagerApplicationEventReceiver();
-//        applicationEventReceiver.setExecutorService(executorService);
+//        applicationEventReceiver.setExecutorService(executor);
 //        applicationEventReceiver.execute();
     }
 
@@ -337,25 +339,33 @@ public class StratosManagerServiceComponent {
         
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
         
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
 
-        shutdownExecutorService(THREAD_POOL_ID);
+        shutdownExecutor(THREAD_POOL_ID);
         shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
     }
 
-    private void shutdownExecutorService(String executorServiceId) {
-        ExecutorService executorService = 
StratosThreadPool.getExecutorService(executorServiceId, 1);
-        if (executorService != null) {
-            shutdownExecutorService(executorService);
+    private void shutdownExecutor(String executorServiceId) {
+        ThreadPoolExecutor executor = 
StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
+        if (executor != null) {
+            shutdownExecutor(executor);
         }
     }
 
     private void shutdownScheduledExecutorService(String executorServiceId) {
         ExecutorService executorService = 
StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
         if (executorService != null) {
-            shutdownExecutorService(executorService);
+            shutdownExecutor(executorService);
+        }
+    }
+
+    private void shutdownExecutor(ThreadPoolExecutor executor) {
+        try {
+            executor.shutdownNow();
+        } catch (Exception e) {
+            log.warn("An error occurred while shutting down executor service", 
e);
         }
     }
 
-    private void shutdownExecutorService(ExecutorService executorService) {
+    private void shutdownExecutor(ExecutorService executorService) {
         try {
             executorService.shutdownNow();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
index c08e8e4..5f7bc53 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInitializerTopicReceiver.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
 public class StratosManagerInitializerTopicReceiver {
     private static final Log log = 
LogFactory.getLog(StratosManagerInitializerTopicReceiver.class);
     private InitializerEventReceiver initializerEventReceiver;
-    //private ExecutorService executorService;
+    //private ExecutorService executor;
     private ApplicationSignUpHandler applicationSignUpHandler;
 
     public StratosManagerInitializerTopicReceiver() {
@@ -43,7 +43,7 @@ public class StratosManagerInitializerTopicReceiver {
     }
 
 //    public void execute() {
-//        initializerEventReceiver.setExecutorService(executorService);
+//        initializerEventReceiver.setExecutorService(executor);
 //        initializerEventReceiver.execute();
 //        if (log.isInfoEnabled()) {
 //            log.info("Stratos manager initializer topic receiver started");
@@ -82,10 +82,10 @@ public class StratosManagerInitializerTopicReceiver {
     }
 
 //    public ExecutorService getExecutorService() {
-//        return executorService;
+//        return executor;
 //    }
 //
-//    public void setExecutorService(ExecutorService executorService) {
-//        this.executorService = executorService;
+//    public void setExecutorService(ExecutorService executor) {
+//        this.executor = executor;
 //    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index 5ac89e6..0b07940 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -19,11 +19,11 @@
 
 package org.apache.stratos.messaging.message.receiver;
 
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 
 public class StratosEventReceiver {
 
-    protected ExecutorService executorService;
+    protected ThreadPoolExecutor executor;
 
     public StratosEventReceiver () {
     }

Reply via email to