using ScheduledThreadPoolExecutor in StratosThreadPool
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4b15a1b8 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4b15a1b8 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4b15a1b8 Branch: refs/heads/stratos-4.1.x Commit: 4b15a1b8fbab3df07d9a4c542ab5286add84380c Parents: 90008dd Author: Isuru Haththotuwa <[email protected]> Authored: Mon Dec 7 10:45:30 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Mon Dec 7 18:49:56 2015 +0530 ---------------------------------------------------------------------- .../internal/AutoscalerServiceComponent.java | 57 ++++++------ .../monitor/cluster/ClusterMonitor.java | 2 +- .../component/ParentComponentMonitor.java | 2 +- .../context/CloudControllerContext.java | 6 +- .../CloudControllerServiceComponent.java | 21 ++--- .../common/concurrent/locks/ReadWriteLock.java | 5 +- .../threading/GracefulThreadPoolTerminator.java | 58 ++++++++++++ .../common/threading/StratosThreadPool.java | 96 +++++--------------- .../internal/LoadBalancerServiceComponent.java | 2 - .../LoadBalancerStatisticsExecutor.java | 1 - .../StratosManagerServiceComponent.java | 69 +++++++------- .../mock/iaas/services/impl/MockInstance.java | 8 +- .../MockHealthStatisticsGenerator.java | 9 +- 13 files changed, 165 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java index 8219f71..9b1a290 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java @@ -57,10 +57,7 @@ import org.wso2.carbon.utils.ConfigurationContextService; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServiceComponent" immediate="true" @@ -87,7 +84,7 @@ public class AutoscalerServiceComponent { private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver; private AutoscalerInitializerTopicReceiver autoscalerInitializerTopicReceiver; private ThreadPoolExecutor executor; - private ScheduledExecutorService scheduler; + private ScheduledThreadPoolExecutor scheduler; protected void activate(ComponentContext componentContext) throws Exception { if (log.isDebugEnabled()) { @@ -263,39 +260,39 @@ public class AutoscalerServiceComponent { } // Shutdown executor service - shutdownExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID); + //shutdownExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID); // Shutdown scheduler - shutdownScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID); + //shutdownScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID); // Shutdown application monitor executor service - shutdownExecutorService(AutoscalerConstants.MONITOR_THREAD_POOL_ID); + //shutdownExecutorService(AutoscalerConstants.MONITOR_THREAD_POOL_ID); // Shutdown cluster monitor scheduler executor service - shutdownScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID); + //shutdownScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID); } - private void shutdownExecutorService(String executorServiceId) { - ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); - if (executor != null) { - shutdownExecutorService(executor); - } - } - - private void shutdownScheduledExecutorService(String executorServiceId) { - ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1); - if (executorService != null) { - shutdownExecutorService(executorService); - } - } - - private void shutdownExecutorService(ExecutorService executorService) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down executor service", e); - } - } +// private void shutdownExecutorService(String executorServiceId) { +// ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); +// if (executor != null) { +// shutdownExecutorService(executor); +// } +// } +// +// private void shutdownScheduledExecutorService(String executorServiceId) { +// ScheduledThreadPoolExecutor scheduledExecutor = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1); +// if (scheduledExecutor != null) { +// shutdownExecutorService(scheduledExecutor); +// } +// } +// +// private void shutdownExecutorService(ExecutorService executorService) { +// try { +// executorService.shutdownNow(); +// } catch (Exception e) { +// log.warn("An error occurred while shutting down executor service", e); +// } +// } protected void setRegistryService(RegistryService registryService) { if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index 32bf037..ead9130 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -86,7 +86,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ClusterMonitor extends Monitor { private static final Log log = LogFactory.getLog(ClusterMonitor.class); - private final ScheduledExecutorService scheduler; + private final ScheduledThreadPoolExecutor scheduler; private final ThreadPoolExecutor executor; protected boolean hasFaultyMember = false; protected ClusterContext clusterContext; http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java index 1366a3f..9efdf7c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java @@ -68,7 +68,7 @@ public abstract class ParentComponentMonitor extends Monitor { private static final Log log = LogFactory.getLog(ParentComponentMonitor.class); //Scheduler executor service to execute this monitor in a thread - private final ScheduledExecutorService scheduler = StratosThreadPool.getScheduledExecutorService( + private final ScheduledThreadPoolExecutor scheduler = StratosThreadPool.getScheduledExecutorService( "autoscaler.monitor.scheduler.thread.pool", 100); //The monitors dependency tree with all the start-able/kill-able dependencies protected DependencyTree startupDependencyTree; http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java index 0771d5a..9858410 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java @@ -42,9 +42,7 @@ import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.io.Serializable; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Lock; /** @@ -136,8 +134,8 @@ public class CloudControllerContext implements Serializable { /** * Thread pool used in this task to execute parallel tasks. */ - private transient ThreadPoolExecutor executor = StratosThreadPool - .getExecutorService("cloud.controller.context.thread.pool", 5, 10); +// private transient ThreadPoolExecutor executor = StratosThreadPool +// .getExecutorService("cloud.controller.context.thread.pool", 5, 10); /** * Map of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index 710e400..0d02d46 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -47,10 +47,7 @@ import org.wso2.carbon.registry.core.service.RegistryService; import org.wso2.carbon.registry.core.session.UserRegistry; import org.wso2.carbon.utils.ConfigurationContextService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Registering Cloud Controller Service. @@ -84,7 +81,7 @@ public class CloudControllerServiceComponent { private ApplicationEventReceiver applicationEventReceiver; private InitializerTopicReceiver initializerTopicReceiver; private ThreadPoolExecutor executor; - private ScheduledExecutorService scheduler; + private ScheduledThreadPoolExecutor scheduler; protected void activate(final ComponentContext context) { if (log.isDebugEnabled()) { @@ -264,18 +261,18 @@ public class CloudControllerServiceComponent { } // Shutdown executor service - shutdownExecutorService(THREAD_POOL_ID); + //shutdownExecutorService(THREAD_POOL_ID); // Shutdown scheduler shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID); } - private void shutdownExecutorService(String executorServiceId) { - ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); - if (executor != null) { - shutdownExecutorService(executor); - } - } +// private void shutdownExecutorService(String executorServiceId) { +// ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); +// if (executor != null) { +// shutdownExecutorService(executor); +// } +// } private void shutdownScheduledExecutorService(String executorServiceId) { ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1); http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java index 6d91330..90c2f22 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -62,9 +63,9 @@ public class ReadWriteLock { readWriteLockMonitorInterval = Integer.getInteger("read.write.lock.monitor.interval", 30000); threadPoolSize = Integer.getInteger(READ_WRITE_LOCK_MONITOR_THREAD_POOL_SIZE_KEY, 10); - ScheduledExecutorService scheduledExecutorService = StratosThreadPool.getScheduledExecutorService( + ScheduledThreadPoolExecutor scheduledExecutor = StratosThreadPool.getScheduledExecutorService( READ_WRITE_LOCK_MONITOR_THREAD_POOL, threadPoolSize); - scheduledExecutorService.scheduleAtFixedRate(new ReadWriteLockMonitor(this), + scheduledExecutor.scheduleAtFixedRate(new ReadWriteLockMonitor(this), readWriteLockMonitorInterval, readWriteLockMonitorInterval, TimeUnit.MILLISECONDS); if (log.isDebugEnabled()) { log.debug(String.format("Lock monitor scheduled: [lock-name] %s [interval] %d seconds", http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java new file mode 100644 index 0000000..70cda66 --- /dev/null +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.common.threading; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class GracefulThreadPoolTerminator implements Callable<String> { + + private static final Log log = LogFactory.getLog(GracefulThreadPoolTerminator.class); + + private String threadPoolId; + private ThreadPoolExecutor executor; + + public GracefulThreadPoolTerminator (String threadPoolId, ThreadPoolExecutor executor) { + this.threadPoolId = threadPoolId; + this.executor = executor; + } + + @Override + public String call() { + // try to shut down gracefully + executor.shutdown(); + // wait 10 secs till terminated + try { + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Thread Pool [id] " + threadPoolId + " did not finish all tasks before " + + "timeout, forcefully shutting down"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + // interrupted, shutdown now + executor.shutdownNow(); + } + return threadPoolId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index 50b53bb..4eb8304 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -36,16 +36,16 @@ public class StratosThreadPool { private static final Log log = LogFactory.getLog(StratosThreadPool.class); private static Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>(); - private static Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>(); - private static Object executorServiceMapLock = new Object(); - private static Object scheduledServiceMapLock = new Object(); + private static Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>(); + private static final Object executorServiceMapLock = new Object(); + private static final Object scheduledServiceMapLock = new Object(); /** - * Return the executor service based on the identifier and thread pool size + * Return the executor based on the identifier and thread pool size * * @param identifier Thread pool identifier name * @param maxSize Thread pool size - * @return ExecutorService + * @return ThreadPoolExecutor */ public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int maxSize) { @@ -69,23 +69,23 @@ public class StratosThreadPool { * * @param identifier Thread pool identifier name * @param threadPoolSize Thread pool size - * @return + * @return ScheduledThreadPoolExecutor */ - public static ScheduledExecutorService getScheduledExecutorService(String identifier, int threadPoolSize) { - ScheduledExecutorService scheduledExecutorService = scheduledServiceMap.get(identifier); - if (scheduledExecutorService == null) { + public static ScheduledThreadPoolExecutor getScheduledExecutorService(String identifier, int threadPoolSize) { + ScheduledThreadPoolExecutor scheduledExecutor = scheduledExecutorMap.get(identifier); + if (scheduledExecutor == null) { synchronized (scheduledServiceMapLock) { - if (scheduledExecutorService == null) { - scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize, + if (scheduledExecutor == null) { + scheduledExecutor = new ScheduledThreadPoolExecutor(threadPoolSize, new StratosThreadFactory(identifier)); - scheduledServiceMap.put(identifier, scheduledExecutorService); - log.info(String.format("Thread pool created: [type] Scheduled Executor Service [id] %s [size] %d", + scheduledExecutorMap.put(identifier, scheduledExecutor); + log.info(String.format("Thread pool created: [type] Scheduled Executor [id] %s [size] %d", identifier, threadPoolSize)); } } } - return scheduledExecutorService; + return scheduledExecutor; } public static void shutDownAllThreadPoolsGracefully () { @@ -102,7 +102,8 @@ public class StratosThreadPool { try { threadPoolTerminator = Executors.newFixedThreadPool(threadPoolCount); for (Map.Entry<String, ThreadPoolExecutor> entry : executorMap.entrySet()) { - threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulThreadPoolTerminator(entry.getKey(), + threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new + GracefulThreadPoolTerminator(entry.getKey(), entry.getValue()))); } // use the Future to block until shutting down is done @@ -130,7 +131,7 @@ public class StratosThreadPool { public static void shutDownAllScheduledExecutorsGracefully () { - int threadPoolCount = scheduledServiceMap.size(); + int threadPoolCount = scheduledExecutorMap.size(); if (threadPoolCount == 0) { log.info("No thread pools found to shut down"); return; @@ -141,10 +142,9 @@ public class StratosThreadPool { try { threadPoolTerminator = Executors.newFixedThreadPool(threadPoolCount); - for (Map.Entry<String, ScheduledExecutorService> entry : scheduledServiceMap.entrySet()) { - threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulScheduledThreadPoolTerminator(entry.getKey(), + for (Map.Entry<String, ScheduledThreadPoolExecutor> entry : scheduledExecutorMap.entrySet()) + threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulThreadPoolTerminator(entry.getKey(), entry.getValue()))); - } // use the Future to block until shutting down is done for (Future<String> threadPoolTerminatorFuture : threadPoolTerminatorFutures) { removeScheduledThreadPoolFromCache(threadPoolTerminatorFuture.get()); @@ -154,10 +154,12 @@ public class StratosThreadPool { log.error("Error in shutting down thread pools", e); } catch (ExecutionException e) { log.error("Error in shutting down thread pools", e); + } catch (Exception e) { + log.error("Error in shutting down thread pools", e); } finally { // if there are any remaining thread pools, shut down immediately - if (!scheduledServiceMap.isEmpty()) { - for (Map.Entry<String, ScheduledExecutorService> entry : scheduledServiceMap.entrySet()) { + if (!scheduledExecutorMap.isEmpty()) { + for (Map.Entry<String, ScheduledThreadPoolExecutor> entry : scheduledExecutorMap.entrySet()) { entry.getValue().shutdownNow(); removeScheduledThreadPoolFromCache(entry.getKey()); } @@ -176,59 +178,9 @@ public class StratosThreadPool { } private static void removeScheduledThreadPoolFromCache(String terminatedPoolId) { - if (scheduledServiceMap.remove(terminatedPoolId) != null) { + if (scheduledExecutorMap.remove(terminatedPoolId) != null) { log.info("Scheduled Thread pool [id] " + terminatedPoolId + " is successfully shut down" + " and removed from the cache"); } } - - private static class GracefulThreadPoolTerminator implements Callable { - - private String threadPoolId; - private ThreadPoolExecutor executor; - - public GracefulThreadPoolTerminator (String threadPoolId, ThreadPoolExecutor executor) { - this.threadPoolId = threadPoolId; - this.executor = executor; - } - - @Override - public String call() throws Exception { - log.info("Shutting down thread pool " + threadPoolId); - // try to shut down gracefully - executor.shutdown(); - // wait 10 secs till terminated - if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { - log.info("Thread Pool [id] " + threadPoolId + " did not finish all tasks before " + - "timeout, forcefully shutting down"); - executor.shutdownNow(); - } - return threadPoolId; - } - } - - private static class GracefulScheduledThreadPoolTerminator implements Callable { - - private String threadPoolId; - private ScheduledExecutorService scheduledExecutor; - - public GracefulScheduledThreadPoolTerminator (String threadPoolId, ScheduledExecutorService scheduledExecutor) { - this.threadPoolId = threadPoolId; - this.scheduledExecutor = scheduledExecutor; - } - - @Override - public String call() throws Exception { - log.info("Shutting down scheduled thread pool " + threadPoolId); - // try to shut down gracefully - scheduledExecutor.shutdown(); - // wait 10 secs till terminated - if (!scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - log.info("Scheduled thread Pool [id] " + threadPoolId + " did not finish all tasks before " + - "timeout, forcefully shutting down"); - scheduledExecutor.shutdownNow(); - } - return threadPoolId; - } - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index cb2297a..e7b45f8 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -63,8 +63,6 @@ import org.wso2.carbon.utils.multitenancy.MultitenantConstants; import java.io.File; import java.util.Collection; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; /** * @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true" http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java index e625ec7..486b98b 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.util.LoadBalancerConstants; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; /** http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java index 0486d84..7de06ae 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java @@ -48,10 +48,7 @@ import org.wso2.carbon.user.core.UserStoreException; import org.wso2.carbon.user.core.service.RealmService; import org.wso2.carbon.utils.ConfigurationContextService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @scr.component name="org.wso2.carbon.hosting.mgt.internal.StratosManagerServiceComponent" @@ -94,7 +91,7 @@ public class StratosManagerServiceComponent { private StratosManagerApplicationEventReceiver applicationEventReceiver; private StratosManagerInitializerTopicReceiver initializerTopicReceiver; private ThreadPoolExecutor executor; - private ScheduledExecutorService scheduler; + private ScheduledThreadPoolExecutor scheduler; protected void activate(final ComponentContext componentContext) throws Exception { if (log.isDebugEnabled()) { @@ -339,37 +336,37 @@ public class StratosManagerServiceComponent { EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName()); EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName()); - shutdownExecutor(THREAD_POOL_ID); - shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID); + //shutdownExecutor(THREAD_POOL_ID); + //shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID); } - private void shutdownExecutor(String executorServiceId) { - ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); - if (executor != null) { - shutdownExecutor(executor); - } - } - - private void shutdownScheduledExecutorService(String executorServiceId) { - ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1); - if (executorService != null) { - shutdownExecutor(executorService); - } - } - - private void shutdownExecutor(ThreadPoolExecutor executor) { - try { - executor.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down executor service", e); - } - } - - private void shutdownExecutor(ExecutorService executorService) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down executor service", e); - } - } +// private void shutdownExecutor(String executorServiceId) { +// ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1); +// if (executor != null) { +// shutdownExecutor(executor); +// } +// } + +// private void shutdownScheduledExecutorService(String executorServiceId) { +// ScheduledThreadPoolExecutor scheduledExecutor = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1); +// if (scheduledExecutor != null) { +// shutdownExecutor(scheduledExecutor); +// } +// } + +// private void shutdownExecutor(ThreadPoolExecutor executor) { +// try { +// executor.shutdownNow(); +// } catch (Exception e) { +// log.warn("An error occurred while shutting down executor service", e); +// } +// } + +// private void shutdownExecutor(ExecutorService executorService) { +// try { +// executorService.shutdownNow(); +// } catch (Exception e) { +// log.warn("An error occurred while shutting down executor service", e); +// } +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java index df7145e..27a664e 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java @@ -62,9 +62,9 @@ public class MockInstance implements Serializable { private final MockInstanceContext mockInstanceContext; private final AtomicBoolean hasGracefullyShutdown = new AtomicBoolean(false); - private static final ThreadPoolExecutor eventListenerExecutor = StratosThreadPool - .getExecutorService("mock.iaas.event.listener.thread.pool", 35, 100); - private static final ScheduledExecutorService healthStatNotifierExecutorService = StratosThreadPool +// private static final ThreadPoolExecutor eventListenerExecutor = StratosThreadPool +// .getExecutorService("mock.iaas.event.listener.thread.pool", 35, 100); + private static final ScheduledThreadPoolExecutor healthStatNotifierExecutor = StratosThreadPool .getScheduledExecutorService("mock.iaas.health.statistics.notifier.thread.pool", 100); public MockInstance(MockInstanceContext mockInstanceContext) { @@ -95,7 +95,7 @@ public class MockInstance implements Serializable { log.debug(String.format("Starting health statistics notifier: [member-id] %s", mockInstanceContext.getMemberId())); } - healthStatNotifierScheduledFuture = healthStatNotifierExecutorService + healthStatNotifierScheduledFuture = healthStatNotifierExecutor .scheduleAtFixedRate(mockHealthStatisticsNotifier, 0, HEALTH_STAT_INTERVAL, TimeUnit.SECONDS); if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java index 1c3da6e..367b4b3 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java @@ -26,10 +26,7 @@ import org.apache.stratos.mock.iaas.config.MockIaasConfig; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Mock health statistics generator. @@ -39,7 +36,7 @@ public class MockHealthStatisticsGenerator { private static final Log log = LogFactory.getLog(MockHealthStatisticsGenerator.class); private static volatile MockHealthStatisticsGenerator instance; - private static final ScheduledExecutorService scheduledExecutorService = + private static final ScheduledThreadPoolExecutor scheduledExecutor = StratosThreadPool.getScheduledExecutorService("mock.iaas.health.statistics.generator.thread.pool", 10); // Map<ServiceName, Map<ScalingFactor, ScheduledFuture>> @@ -84,7 +81,7 @@ public class MockHealthStatisticsGenerator { if (statisticsPattern.getCartridgeType().equals(serviceName) && (statisticsPattern.getSampleDuration() > 0)) { MockHealthStatisticsUpdater runnable = new MockHealthStatisticsUpdater(statisticsPattern); - ScheduledFuture<?> task = scheduledExecutorService.scheduleAtFixedRate(runnable, 0, + ScheduledFuture<?> task = scheduledExecutor.scheduleAtFixedRate(runnable, 0, statisticsPattern.getSampleDuration(), TimeUnit.SECONDS); taskList.put(statisticsPattern.getFactor().toString(), task); }
